diff options
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/server.ts')
| -rw-r--r-- | frontend-old/node_modules/@grpc/grpc-js/src/server.ts | 1300 |
1 files changed, 1300 insertions, 0 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/server.ts b/frontend-old/node_modules/@grpc/grpc-js/src/server.ts new file mode 100644 index 0000000..0d8e082 --- /dev/null +++ b/frontend-old/node_modules/@grpc/grpc-js/src/server.ts @@ -0,0 +1,1300 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as http2 from 'http2'; +import { AddressInfo } from 'net'; + +import { ServiceError } from './call'; +import { Status, LogVerbosity } from './constants'; +import { Deserialize, Serialize, ServiceDefinition } from './make-client'; +import { Metadata } from './metadata'; +import { + BidiStreamingHandler, + ClientStreamingHandler, + HandleCall, + Handler, + HandlerType, + Http2ServerCallStream, + sendUnaryData, + ServerDuplexStream, + ServerDuplexStreamImpl, + ServerReadableStream, + ServerReadableStreamImpl, + ServerStreamingHandler, + ServerUnaryCall, + ServerUnaryCallImpl, + ServerWritableStream, + ServerWritableStreamImpl, + UnaryHandler, + ServerErrorResponse, + ServerStatusResponse, +} from './server-call'; +import { ServerCredentials } from './server-credentials'; +import { ChannelOptions } from './channel-options'; +import { + createResolver, + ResolverListener, + mapUriDefaultScheme, +} from './resolver'; +import * as logging from './logging'; +import { + SubchannelAddress, + TcpSubchannelAddress, + isTcpSubchannelAddress, + subchannelAddressToString, + stringToSubchannelAddress, +} from './subchannel-address'; +import { parseUri } from './uri-parser'; +import { + ChannelzCallTracker, + ChannelzChildrenTracker, + ChannelzTrace, + registerChannelzServer, + registerChannelzSocket, + ServerInfo, + ServerRef, + SocketInfo, + SocketRef, + TlsInfo, + unregisterChannelzRef, +} from './channelz'; +import { CipherNameAndProtocol, TLSSocket } from 'tls'; + +const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); +const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); +const KEEPALIVE_TIMEOUT_MS = 20000; + +const { HTTP2_HEADER_PATH } = http2.constants; + +const TRACER_NAME = 'server'; + +interface BindResult { + port: number; + count: number; +} + +function noop(): void {} + +function getUnimplementedStatusResponse( + methodName: string +): Partial<ServiceError> { + return { + code: Status.UNIMPLEMENTED, + details: `The server does not implement the method ${methodName}`, + }; +} + +/* eslint-disable @typescript-eslint/no-explicit-any */ +type UntypedUnaryHandler = UnaryHandler<any, any>; +type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>; +type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>; +type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>; +export type UntypedHandleCall = HandleCall<any, any>; +type UntypedHandler = Handler<any, any>; +export interface UntypedServiceImplementation { + [name: string]: UntypedHandleCall; +} + +function getDefaultHandler(handlerType: HandlerType, methodName: string) { + const unimplementedStatusResponse = + getUnimplementedStatusResponse(methodName); + switch (handlerType) { + case 'unary': + return ( + call: ServerUnaryCall<any, any>, + callback: sendUnaryData<any> + ) => { + callback(unimplementedStatusResponse as ServiceError, null); + }; + case 'clientStream': + return ( + call: ServerReadableStream<any, any>, + callback: sendUnaryData<any> + ) => { + callback(unimplementedStatusResponse as ServiceError, null); + }; + case 'serverStream': + return (call: ServerWritableStream<any, any>) => { + call.emit('error', unimplementedStatusResponse); + }; + case 'bidi': + return (call: ServerDuplexStream<any, any>) => { + call.emit('error', unimplementedStatusResponse); + }; + default: + throw new Error(`Invalid handlerType ${handlerType}`); + } +} + +interface ChannelzSessionInfo { + ref: SocketRef; + streamTracker: ChannelzCallTracker; + messagesSent: number; + messagesReceived: number; + lastMessageSentTimestamp: Date | null; + lastMessageReceivedTimestamp: Date | null; +} + +export class Server { + private http2ServerList: { + server: http2.Http2Server | http2.Http2SecureServer; + channelzRef: SocketRef; + }[] = []; + + private handlers: Map<string, UntypedHandler> = new Map< + string, + UntypedHandler + >(); + private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>(); + private started = false; + private shutdown = false; + private options: ChannelOptions; + private serverAddressString = 'null'; + + // Channelz Info + private readonly channelzEnabled: boolean = true; + private channelzRef: ServerRef; + private channelzTrace = new ChannelzTrace(); + private callTracker = new ChannelzCallTracker(); + private listenerChildrenTracker = new ChannelzChildrenTracker(); + private sessionChildrenTracker = new ChannelzChildrenTracker(); + + private readonly maxConnectionAgeMs: number; + private readonly maxConnectionAgeGraceMs: number; + + private readonly keepaliveTimeMs: number; + private readonly keepaliveTimeoutMs: number; + + constructor(options?: ChannelOptions) { + this.options = options ?? {}; + if (this.options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + this.channelzRef = registerChannelzServer( + () => this.getChannelzInfo(), + this.channelzEnabled + ); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Server created'); + } + this.maxConnectionAgeMs = + this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; + this.maxConnectionAgeGraceMs = + this.options['grpc.max_connection_age_grace_ms'] ?? + UNLIMITED_CONNECTION_AGE_MS; + this.keepaliveTimeMs = + this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS; + this.keepaliveTimeoutMs = + this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; + this.trace('Server constructed'); + } + + private getChannelzInfo(): ServerInfo { + return { + trace: this.channelzTrace, + callTracker: this.callTracker, + listenerChildren: this.listenerChildrenTracker.getChildLists(), + sessionChildren: this.sessionChildrenTracker.getChildLists(), + }; + } + + private getChannelzSessionInfoGetter( + session: http2.ServerHttp2Session + ): () => SocketInfo { + return () => { + const sessionInfo = this.sessions.get(session)!; + const sessionSocket = session.socket; + const remoteAddress = sessionSocket.remoteAddress + ? stringToSubchannelAddress( + sessionSocket.remoteAddress, + sessionSocket.remotePort + ) + : null; + const localAddress = sessionSocket.localAddress + ? stringToSubchannelAddress( + sessionSocket.localAddress!, + sessionSocket.localPort + ) + : null; + let tlsInfo: TlsInfo | null; + if (session.encrypted) { + const tlsSocket: TLSSocket = sessionSocket as TLSSocket; + const cipherInfo: CipherNameAndProtocol & { standardName?: string } = + tlsSocket.getCipher(); + const certificate = tlsSocket.getCertificate(); + const peerCertificate = tlsSocket.getPeerCertificate(); + tlsInfo = { + cipherSuiteStandardName: cipherInfo.standardName ?? null, + cipherSuiteOtherName: cipherInfo.standardName + ? null + : cipherInfo.name, + localCertificate: + certificate && 'raw' in certificate ? certificate.raw : null, + remoteCertificate: + peerCertificate && 'raw' in peerCertificate + ? peerCertificate.raw + : null, + }; + } else { + tlsInfo = null; + } + const socketInfo: SocketInfo = { + remoteAddress: remoteAddress, + localAddress: localAddress, + security: tlsInfo, + remoteName: null, + streamsStarted: sessionInfo.streamTracker.callsStarted, + streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, + streamsFailed: sessionInfo.streamTracker.callsFailed, + messagesSent: sessionInfo.messagesSent, + messagesReceived: sessionInfo.messagesReceived, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: + sessionInfo.streamTracker.lastCallStartedTimestamp, + lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, + lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, + localFlowControlWindow: session.state.localWindowSize ?? null, + remoteFlowControlWindow: session.state.remoteWindowSize ?? null, + }; + return socketInfo; + }; + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '(' + this.channelzRef.id + ') ' + text + ); + } + + addProtoService(): never { + throw new Error('Not implemented. Use addService() instead'); + } + + addService( + service: ServiceDefinition, + implementation: UntypedServiceImplementation + ): void { + if ( + service === null || + typeof service !== 'object' || + implementation === null || + typeof implementation !== 'object' + ) { + throw new Error('addService() requires two objects as arguments'); + } + + const serviceKeys = Object.keys(service); + + if (serviceKeys.length === 0) { + throw new Error('Cannot add an empty service to a server'); + } + + serviceKeys.forEach(name => { + const attrs = service[name]; + let methodType: HandlerType; + + if (attrs.requestStream) { + if (attrs.responseStream) { + methodType = 'bidi'; + } else { + methodType = 'clientStream'; + } + } else { + if (attrs.responseStream) { + methodType = 'serverStream'; + } else { + methodType = 'unary'; + } + } + + let implFn = implementation[name]; + let impl; + + if (implFn === undefined && typeof attrs.originalName === 'string') { + implFn = implementation[attrs.originalName]; + } + + if (implFn !== undefined) { + impl = implFn.bind(implementation); + } else { + impl = getDefaultHandler(methodType, name); + } + + const success = this.register( + attrs.path, + impl as UntypedHandleCall, + attrs.responseSerialize, + attrs.requestDeserialize, + methodType + ); + + if (success === false) { + throw new Error(`Method handler for ${attrs.path} already provided.`); + } + }); + } + + removeService(service: ServiceDefinition): void { + if (service === null || typeof service !== 'object') { + throw new Error('removeService() requires object as argument'); + } + + const serviceKeys = Object.keys(service); + serviceKeys.forEach(name => { + const attrs = service[name]; + this.unregister(attrs.path); + }); + } + + bind(port: string, creds: ServerCredentials): never { + throw new Error('Not implemented. Use bindAsync() instead'); + } + + bindAsync( + port: string, + creds: ServerCredentials, + callback: (error: Error | null, port: number) => void + ): void { + if (this.started === true) { + throw new Error('server is already started'); + } + + if (this.shutdown) { + throw new Error('bindAsync called after shutdown'); + } + + if (typeof port !== 'string') { + throw new TypeError('port must be a string'); + } + + if (creds === null || !(creds instanceof ServerCredentials)) { + throw new TypeError('creds must be a ServerCredentials object'); + } + + if (typeof callback !== 'function') { + throw new TypeError('callback must be a function'); + } + + const initialPortUri = parseUri(port); + if (initialPortUri === null) { + throw new Error(`Could not parse port "${port}"`); + } + const portUri = mapUriDefaultScheme(initialPortUri); + if (portUri === null) { + throw new Error(`Could not get a default scheme for port "${port}"`); + } + + const serverOptions: http2.ServerOptions = { + maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, + }; + if ('grpc-node.max_session_memory' in this.options) { + serverOptions.maxSessionMemory = + this.options['grpc-node.max_session_memory']; + } else { + /* By default, set a very large max session memory limit, to effectively + * disable enforcement of the limit. Some testing indicates that Node's + * behavior degrades badly when this limit is reached, so we solve that + * by disabling the check entirely. */ + serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; + } + if ('grpc.max_concurrent_streams' in this.options) { + serverOptions.settings = { + maxConcurrentStreams: this.options['grpc.max_concurrent_streams'], + }; + } + + const deferredCallback = (error: Error | null, port: number) => { + process.nextTick(() => callback(error, port)); + }; + + const setupServer = (): http2.Http2Server | http2.Http2SecureServer => { + let http2Server: http2.Http2Server | http2.Http2SecureServer; + if (creds._isSecure()) { + const secureServerOptions = Object.assign( + serverOptions, + creds._getSettings()! + ); + secureServerOptions.enableTrace = + this.options['grpc-node.tls_enable_trace'] === 1; + http2Server = http2.createSecureServer(secureServerOptions); + http2Server.on('secureConnection', (socket: TLSSocket) => { + /* These errors need to be handled by the user of Http2SecureServer, + * according to https://github.com/nodejs/node/issues/35824 */ + socket.on('error', (e: Error) => { + this.trace( + 'An incoming TLS connection closed with error: ' + e.message + ); + }); + }); + } else { + http2Server = http2.createServer(serverOptions); + } + + http2Server.setTimeout(0, noop); + this._setupHandlers(http2Server); + return http2Server; + }; + + const bindSpecificPort = ( + addressList: SubchannelAddress[], + portNum: number, + previousCount: number + ): Promise<BindResult> => { + if (addressList.length === 0) { + return Promise.resolve({ port: portNum, count: previousCount }); + } + return Promise.all( + addressList.map(address => { + this.trace( + 'Attempting to bind ' + subchannelAddressToString(address) + ); + let addr: SubchannelAddress; + if (isTcpSubchannelAddress(address)) { + addr = { + host: (address as TcpSubchannelAddress).host, + port: portNum, + }; + } else { + addr = address; + } + + const http2Server = setupServer(); + return new Promise<number | Error>((resolve, reject) => { + const onError = (err: Error) => { + this.trace( + 'Failed to bind ' + + subchannelAddressToString(address) + + ' with error ' + + err.message + ); + resolve(err); + }; + + http2Server.once('error', onError); + + http2Server.listen(addr, () => { + if (this.shutdown) { + http2Server.close(); + resolve(new Error('bindAsync failed because server is shutdown')); + return; + } + const boundAddress = http2Server.address()!; + let boundSubchannelAddress: SubchannelAddress; + if (typeof boundAddress === 'string') { + boundSubchannelAddress = { + path: boundAddress, + }; + } else { + boundSubchannelAddress = { + host: boundAddress.address, + port: boundAddress.port, + }; + } + + const channelzRef = registerChannelzSocket( + subchannelAddressToString(boundSubchannelAddress), + () => { + return { + localAddress: boundSubchannelAddress, + remoteAddress: null, + security: null, + remoteName: null, + streamsStarted: 0, + streamsSucceeded: 0, + streamsFailed: 0, + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + localFlowControlWindow: null, + remoteFlowControlWindow: null, + }; + }, + this.channelzEnabled + ); + if (this.channelzEnabled) { + this.listenerChildrenTracker.refChild(channelzRef); + } + this.http2ServerList.push({ + server: http2Server, + channelzRef: channelzRef, + }); + this.trace( + 'Successfully bound ' + + subchannelAddressToString(boundSubchannelAddress) + ); + resolve( + 'port' in boundSubchannelAddress + ? boundSubchannelAddress.port + : portNum + ); + http2Server.removeListener('error', onError); + }); + }); + }) + ).then(results => { + let count = 0; + for (const result of results) { + if (typeof result === 'number') { + count += 1; + if (result !== portNum) { + throw new Error( + 'Invalid state: multiple port numbers added from single address' + ); + } + } + } + return { + port: portNum, + count: count + previousCount, + }; + }); + }; + + const bindWildcardPort = ( + addressList: SubchannelAddress[] + ): Promise<BindResult> => { + if (addressList.length === 0) { + return Promise.resolve<BindResult>({ port: 0, count: 0 }); + } + const address = addressList[0]; + const http2Server = setupServer(); + return new Promise<BindResult>((resolve, reject) => { + const onError = (err: Error) => { + this.trace( + 'Failed to bind ' + + subchannelAddressToString(address) + + ' with error ' + + err.message + ); + resolve(bindWildcardPort(addressList.slice(1))); + }; + + http2Server.once('error', onError); + + http2Server.listen(address, () => { + if (this.shutdown) { + http2Server.close(); + resolve({port: 0, count: 0}); + return; + } + const boundAddress = http2Server.address() as AddressInfo; + const boundSubchannelAddress: SubchannelAddress = { + host: boundAddress.address, + port: boundAddress.port, + }; + const channelzRef = registerChannelzSocket( + subchannelAddressToString(boundSubchannelAddress), + () => { + return { + localAddress: boundSubchannelAddress, + remoteAddress: null, + security: null, + remoteName: null, + streamsStarted: 0, + streamsSucceeded: 0, + streamsFailed: 0, + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + localFlowControlWindow: null, + remoteFlowControlWindow: null, + }; + }, + this.channelzEnabled + ); + if (this.channelzEnabled) { + this.listenerChildrenTracker.refChild(channelzRef); + } + this.http2ServerList.push({ + server: http2Server, + channelzRef: channelzRef, + }); + this.trace( + 'Successfully bound ' + + subchannelAddressToString(boundSubchannelAddress) + ); + resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1)); + http2Server.removeListener('error', onError); + }); + }); + }; + + const resolverListener: ResolverListener = { + onSuccessfulResolution: ( + addressList, + serviceConfig, + serviceConfigError + ) => { + // We only want one resolution result. Discard all future results + resolverListener.onSuccessfulResolution = () => {}; + if (this.shutdown) { + deferredCallback( + new Error(`bindAsync failed because server is shutdown`), + 0 + ); + } + if (addressList.length === 0) { + deferredCallback( + new Error(`No addresses resolved for port ${port}`), + 0 + ); + return; + } + let bindResultPromise: Promise<BindResult>; + if (isTcpSubchannelAddress(addressList[0])) { + if (addressList[0].port === 0) { + bindResultPromise = bindWildcardPort(addressList); + } else { + bindResultPromise = bindSpecificPort( + addressList, + addressList[0].port, + 0 + ); + } + } else { + // Use an arbitrary non-zero port for non-TCP addresses + bindResultPromise = bindSpecificPort(addressList, 1, 0); + } + bindResultPromise.then( + bindResult => { + if (bindResult.count === 0) { + const errorString = `No address added out of total ${addressList.length} resolved`; + logging.log(LogVerbosity.ERROR, errorString); + deferredCallback(new Error(errorString), 0); + } else { + if (bindResult.count < addressList.length) { + logging.log( + LogVerbosity.INFO, + `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved` + ); + } + deferredCallback(null, bindResult.port); + } + }, + error => { + const errorString = `No address added out of total ${addressList.length} resolved`; + logging.log(LogVerbosity.ERROR, errorString); + deferredCallback(new Error(errorString), 0); + } + ); + }, + onError: error => { + deferredCallback(new Error(error.details), 0); + }, + }; + + const resolver = createResolver(portUri, resolverListener, this.options); + resolver.updateResolution(); + } + + forceShutdown(): void { + // Close the server if it is still running. + + for (const { server: http2Server, channelzRef: ref } of this + .http2ServerList) { + if (http2Server.listening) { + http2Server.close(() => { + if (this.channelzEnabled) { + this.listenerChildrenTracker.unrefChild(ref); + unregisterChannelzRef(ref); + } + }); + } + } + + this.started = false; + this.shutdown = true; + + // Always destroy any available sessions. It's possible that one or more + // tryShutdown() calls are in progress. Don't wait on them to finish. + this.sessions.forEach((channelzInfo, session) => { + // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to + // recognize destroy(code) as a valid signature. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + session.destroy(http2.constants.NGHTTP2_CANCEL as any); + }); + this.sessions.clear(); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + } + + register<RequestType, ResponseType>( + name: string, + handler: HandleCall<RequestType, ResponseType>, + serialize: Serialize<ResponseType>, + deserialize: Deserialize<RequestType>, + type: string + ): boolean { + if (this.handlers.has(name)) { + return false; + } + + this.handlers.set(name, { + func: handler, + serialize, + deserialize, + type, + path: name, + } as UntypedHandler); + return true; + } + + unregister(name: string): boolean { + return this.handlers.delete(name); + } + + start(): void { + if ( + this.http2ServerList.length === 0 || + this.http2ServerList.every( + ({ server: http2Server }) => http2Server.listening !== true + ) + ) { + throw new Error('server must be bound in order to start'); + } + + if (this.started === true) { + throw new Error('server is already started'); + } + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Starting'); + } + this.started = true; + } + + tryShutdown(callback: (error?: Error) => void): void { + const wrappedCallback = (error?: Error) => { + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + callback(error); + }; + let pendingChecks = 0; + + function maybeCallback(): void { + pendingChecks--; + + if (pendingChecks === 0) { + wrappedCallback(); + } + } + + // Close the server if necessary. + this.started = false; + this.shutdown = true; + + for (const { server: http2Server, channelzRef: ref } of this + .http2ServerList) { + if (http2Server.listening) { + pendingChecks++; + http2Server.close(() => { + if (this.channelzEnabled) { + this.listenerChildrenTracker.unrefChild(ref); + unregisterChannelzRef(ref); + } + maybeCallback(); + }); + } + } + + this.sessions.forEach((channelzInfo, session) => { + if (!session.closed) { + pendingChecks += 1; + session.close(maybeCallback); + } + }); + if (pendingChecks === 0) { + wrappedCallback(); + } + } + + addHttp2Port(): never { + throw new Error('Not yet implemented'); + } + + /** + * Get the channelz reference object for this server. The returned value is + * garbage if channelz is disabled for this server. + * @returns + */ + getChannelzRef() { + return this.channelzRef; + } + + private _verifyContentType( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ): boolean { + const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; + + if ( + typeof contentType !== 'string' || + !contentType.startsWith('application/grpc') + ) { + stream.respond( + { + [http2.constants.HTTP2_HEADER_STATUS]: + http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, + }, + { endStream: true } + ); + return false; + } + + return true; + } + + private _retrieveHandler(path: string): Handler<any, any> | null { + this.trace( + 'Received call to method ' + + path + + ' at address ' + + this.serverAddressString + ); + + const handler = this.handlers.get(path); + + if (handler === undefined) { + this.trace( + 'No handler registered for method ' + + path + + '. Sending UNIMPLEMENTED status.' + ); + return null; + } + + return handler; + } + + private _respondWithError<T extends Partial<ServiceError>>( + err: T, + stream: http2.ServerHttp2Stream, + channelzSessionInfo: ChannelzSessionInfo | null = null + ) { + const call = new Http2ServerCallStream(stream, null!, this.options); + + if (err.code === undefined) { + err.code = Status.INTERNAL; + } + + if (this.channelzEnabled) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + } + + call.sendError(err); + } + + private _channelzHandler( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) { + const channelzSessionInfo = this.sessions.get( + stream.session as http2.ServerHttp2Session + ); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + channelzSessionInfo + ); + return; + } + + const call = new Http2ServerCallStream(stream, handler, this.options); + + call.once('callEnd', (code: Status) => { + if (code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }); + + if (channelzSessionInfo) { + call.once('streamEnd', (success: boolean) => { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); + } + }); + call.on('sendMessage', () => { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + }); + call.on('receiveMessage', () => { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + }); + } + + if (!this._runHandlerForCall(call, handler, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + } + + private _streamHandler( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) { + if (this._verifyContentType(stream, headers) !== true) { + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + null + ); + return; + } + + const call = new Http2ServerCallStream(stream, handler, this.options); + if (!this._runHandlerForCall(call, handler, headers)) { + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + } + + private _runHandlerForCall( + call: Http2ServerCallStream<any, any>, + handler: Handler<any, any>, + headers: http2.IncomingHttpHeaders + ): boolean { + const metadata = call.receiveMetadata(headers); + const encoding = + (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; + metadata.remove('grpc-encoding'); + + const { type } = handler; + if (type === 'unary') { + handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); + } else if (type === 'clientStream') { + handleClientStreaming( + call, + handler as UntypedClientStreamingHandler, + metadata, + encoding + ); + } else if (type === 'serverStream') { + handleServerStreaming( + call, + handler as UntypedServerStreamingHandler, + metadata, + encoding + ); + } else if (type === 'bidi') { + handleBidiStreaming( + call, + handler as UntypedBidiStreamingHandler, + metadata, + encoding + ); + } else { + return false; + } + + return true; + } + + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { + if (http2Server === null) { + return; + } + + const serverAddress = http2Server.address(); + let serverAddressString = 'null'; + if (serverAddress) { + if (typeof serverAddress === 'string') { + serverAddressString = serverAddress; + } else { + serverAddressString = serverAddress.address + ':' + serverAddress.port; + } + } + this.serverAddressString = serverAddressString; + + const handler = this.channelzEnabled + ? this._channelzHandler + : this._streamHandler; + + http2Server.on('stream', handler.bind(this)); + http2Server.on('session', session => { + if (!this.started) { + session.destroy(); + return; + } + + const channelzRef = registerChannelzSocket( + session.socket.remoteAddress ?? 'unknown', + this.getChannelzSessionInfoGetter(session), + this.channelzEnabled + ); + + const channelzSessionInfo: ChannelzSessionInfo = { + ref: channelzRef, + streamTracker: new ChannelzCallTracker(), + messagesSent: 0, + messagesReceived: 0, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + }; + + this.sessions.set(session, channelzSessionInfo); + const clientAddress = session.socket.remoteAddress; + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection established by client ' + clientAddress + ); + this.sessionChildrenTracker.refChild(channelzRef); + } + let connectionAgeTimer: NodeJS.Timeout | null = null; + let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let sessionClosedByServer = false; + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { + // Apply a random jitter within a +/-10% range + const jitterMagnitude = this.maxConnectionAgeMs / 10; + const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; + connectionAgeTimer = setTimeout(() => { + sessionClosedByServer = true; + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by max connection age from ' + clientAddress + ); + } + try { + session.goaway( + http2.constants.NGHTTP2_NO_ERROR, + ~(1 << 31), + Buffer.from('max_age') + ); + } catch (e) { + // The goaway can't be sent because the session is already closed + session.destroy(); + return; + } + session.close(); + /* Allow a grace period after sending the GOAWAY before forcibly + * closing the connection. */ + if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { + connectionAgeGraceTimer = setTimeout(() => { + session.destroy(); + }, this.maxConnectionAgeGraceMs).unref?.(); + } + }, this.maxConnectionAgeMs + jitter).unref?.(); + } + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + } + session.close(); + }, this.keepaliveTimeoutMs).unref?.(); + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); + } + ); + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref?.(); + session.on('close', () => { + if (this.channelzEnabled) { + if (!sessionClosedByServer) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by client ' + clientAddress + ); + } + this.sessionChildrenTracker.unrefChild(channelzRef); + unregisterChannelzRef(channelzRef); + } + if (connectionAgeTimer) { + clearTimeout(connectionAgeTimer); + } + if (connectionAgeGraceTimer) { + clearTimeout(connectionAgeGraceTimer); + } + if (keeapliveTimeTimer) { + clearTimeout(keeapliveTimeTimer); + } + this.sessions.delete(session); + }); + }); + } +} + +async function handleUnary<RequestType, ResponseType>( + call: Http2ServerCallStream<RequestType, ResponseType>, + handler: UnaryHandler<RequestType, ResponseType>, + metadata: Metadata, + encoding: string +): Promise<void> { + try { + const request = await call.receiveUnaryMessage(encoding); + + if (request === undefined || call.cancelled) { + return; + } + + const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>( + call, + metadata, + request + ); + + handler.func( + emitter, + ( + err: ServerErrorResponse | ServerStatusResponse | null, + value?: ResponseType | null, + trailer?: Metadata, + flags?: number + ) => { + call.sendUnaryMessage(err, value, trailer, flags); + } + ); + } catch (err) { + call.sendError(err as ServerErrorResponse); + } +} + +function handleClientStreaming<RequestType, ResponseType>( + call: Http2ServerCallStream<RequestType, ResponseType>, + handler: ClientStreamingHandler<RequestType, ResponseType>, + metadata: Metadata, + encoding: string +): void { + const stream = new ServerReadableStreamImpl<RequestType, ResponseType>( + call, + metadata, + handler.deserialize, + encoding + ); + + function respond( + err: ServerErrorResponse | ServerStatusResponse | null, + value?: ResponseType | null, + trailer?: Metadata, + flags?: number + ) { + stream.destroy(); + call.sendUnaryMessage(err, value, trailer, flags); + } + + if (call.cancelled) { + return; + } + + stream.on('error', respond); + handler.func(stream, respond); +} + +async function handleServerStreaming<RequestType, ResponseType>( + call: Http2ServerCallStream<RequestType, ResponseType>, + handler: ServerStreamingHandler<RequestType, ResponseType>, + metadata: Metadata, + encoding: string +): Promise<void> { + try { + const request = await call.receiveUnaryMessage(encoding); + + if (request === undefined || call.cancelled) { + return; + } + + const stream = new ServerWritableStreamImpl<RequestType, ResponseType>( + call, + metadata, + handler.serialize, + request + ); + + handler.func(stream); + } catch (err) { + call.sendError(err as ServerErrorResponse); + } +} + +function handleBidiStreaming<RequestType, ResponseType>( + call: Http2ServerCallStream<RequestType, ResponseType>, + handler: BidiStreamingHandler<RequestType, ResponseType>, + metadata: Metadata, + encoding: string +): void { + const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>( + call, + metadata, + handler.serialize, + handler.deserialize, + encoding + ); + + if (call.cancelled) { + return; + } + + handler.func(stream); +} |
