diff --git a/packages/next/src/server/app-render/app-render-prerender-utils.ts b/packages/next/src/server/app-render/app-render-prerender-utils.ts index 9e21adb2865dd..6b2349f4548b8 100644 --- a/packages/next/src/server/app-render/app-render-prerender-utils.ts +++ b/packages/next/src/server/app-render/app-render-prerender-utils.ts @@ -1,28 +1,53 @@ +import type { Readable } from 'node:stream' import { InvariantError } from '../../shared/lib/invariant-error' +export type AnyStream = ReadableStream | Readable + +function isWebStream(stream: AnyStream): stream is ReadableStream { + return typeof (stream as ReadableStream).tee === 'function' +} + // React's RSC prerender function will emit an incomplete flight stream when using `prerender`. If the connection // closes then whatever hanging chunks exist will be errored. This is because prerender (an experimental feature) // has not yet implemented a concept of resume. For now we will simulate a paused connection by wrapping the stream // in one that doesn't close even when the underlying is complete. export class ReactServerResult { - private _stream: null | ReadableStream + private _stream: null | AnyStream - constructor(stream: ReadableStream) { + constructor(stream: AnyStream) { this._stream = stream } - tee() { + tee(): AnyStream { if (this._stream === null) { throw new Error( 'Cannot tee a ReactServerResult that has already been consumed' ) } - const tee = this._stream.tee() - this._stream = tee[0] - return tee[1] + if (isWebStream(this._stream)) { + const tee = this._stream.tee() + this._stream = tee[0] + return tee[1] + } + + let Readable: typeof import('node:stream').Readable + if (process.env.TURBOPACK) { + Readable = (require('node:stream') as typeof import('node:stream')) + .Readable + } else { + Readable = ( + __non_webpack_require__('node:stream') as typeof import('node:stream') + ).Readable + } + const webStream = Readable.toWeb(this._stream) as ReadableStream + const tee = webStream.tee() + this._stream = Readable.fromWeb( + tee[0] as import('stream/web').ReadableStream + ) + return Readable.fromWeb(tee[1] as import('stream/web').ReadableStream) } - consume() { + consume(): AnyStream { if (this._stream === null) { throw new Error( 'Cannot consume a ReactServerResult that has already been consumed' @@ -55,18 +80,26 @@ export async function createReactServerPrerenderResult( } export async function createReactServerPrerenderResultFromRender( - underlying: ReadableStream + underlying: AnyStream ): Promise { const chunks: Array = [] - const reader = underlying.getReader() - while (true) { - const { done, value } = await reader.read() - if (done) { - break - } else { - chunks.push(value) + + if (isWebStream(underlying)) { + const reader = underlying.getReader() + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } else { + chunks.push(value) + } + } + } else { + for await (const chunk of underlying) { + chunks.push(chunk instanceof Uint8Array ? chunk : new Uint8Array(chunk)) } } + return new ReactServerPrerenderResult(chunks) } export class ReactServerPrerenderResult { diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index d0c7a0b164aba..395822716767a 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -54,7 +54,9 @@ import { getClientPrerender, processPrelude as processPreludeOp, createDocumentClosingStream, + teeStream, } from './stream-ops' +import type { AnyStream } from './stream-ops' import { stripInternalQueries } from '../internal-utils' import { NEXT_HMR_REFRESH_HEADER, @@ -1095,7 +1097,7 @@ async function generateDynamicFlightRenderResultWithStagesInDev( } let debugChannel: DebugChannelPair | undefined - let stream: ReadableStream + let stream: AnyStream if ( // We only do this flow if we can safely recreate the store from scratch @@ -1129,11 +1131,11 @@ async function generateDynamicFlightRenderResultWithStagesInDev( ) if (shouldValidate) { - let validationDebugChannelClient: Readable | undefined = undefined + let validationDebugChannelClient: AnyStream | undefined = undefined if (returnedDebugChannel) { - const [t1, t2] = returnedDebugChannel.clientSide.readable.tee() + const [t1, t2] = teeStream(returnedDebugChannel.clientSide.readable) returnedDebugChannel.clientSide.readable = t1 - validationDebugChannelClient = nodeStreamFromReadableStream(t2) + validationDebugChannelClient = t2 } consoleAsyncStorage.run( { dim: true }, @@ -1898,7 +1900,7 @@ function App({ }: { /* eslint-disable @next/internal/no-ambiguous-jsx -- React Client */ reactServerStream: Readable | BinaryStreamOf - reactDebugStream: Readable | ReadableStream | undefined + reactDebugStream: AnyStream | undefined debugEndTime: number | undefined preinitScripts: () => void ServerInsertedHTMLProvider: ComponentType<{ @@ -2005,7 +2007,7 @@ function ErrorApp({ // certain object shape. The generic type is not used directly in the type so it // requires a disabling of the eslint rule disallowing unused vars // eslint-disable-next-line @typescript-eslint/no-unused-vars -export type BinaryStreamOf = ReadableStream +export type BinaryStreamOf = AnyStream async function renderToHTMLOrFlightImpl( req: BaseNextRequest, @@ -2698,7 +2700,7 @@ async function renderToStream( metadata: AppPageRenderResultMetadata, createRequestStore: (() => RequestStore) | undefined, fallbackParams: OpaqueFallbackRouteParams | null -): Promise> { +): Promise { /* eslint-disable @next/internal/no-ambiguous-jsx -- React Client */ const { assetPrefix, @@ -2845,7 +2847,7 @@ async function renderToStream( ) let reactServerResult: null | ReactServerResult = null - let reactDebugStream: ReadableStream | undefined + let reactDebugStream: AnyStream | undefined const setHeader = res.setHeader.bind(res) const appendHeader = res.appendHeader.bind(res) @@ -2915,11 +2917,11 @@ async function renderToStream( serverComponentsErrorHandler ) - let validationDebugChannelClient: Readable | undefined = undefined + let validationDebugChannelClient: AnyStream | undefined = undefined if (returnedDebugChannel) { - const [t1, t2] = returnedDebugChannel.clientSide.readable.tee() + const [t1, t2] = teeStream(returnedDebugChannel.clientSide.readable) returnedDebugChannel.clientSide.readable = t1 - validationDebugChannelClient = nodeStreamFromReadableStream(t2) + validationDebugChannelClient = t2 } consoleAsyncStorage.run( @@ -2962,8 +2964,9 @@ async function renderToStream( } if (debugChannel && setReactDebugChannel) { - const [readableSsr, readableBrowser] = - debugChannel.clientSide.readable.tee() + const [readableSsr, readableBrowser] = teeStream( + debugChannel.clientSide.readable + ) reactDebugStream = readableSsr @@ -3059,8 +3062,9 @@ async function renderToStream( const debugChannel = setReactDebugChannel && createDebugChannel() if (debugChannel) { - const [readableSsr, readableBrowser] = - debugChannel.clientSide.readable.tee() + const [readableSsr, readableBrowser] = teeStream( + debugChannel.clientSide.readable + ) reactDebugStream = readableSsr @@ -3482,8 +3486,8 @@ async function renderWithRestartOnCacheMissInDev( initialStageController.advanceStage(RenderStage.EarlyStatic) startTime = performance.now() + performance.timeOrigin - const streamPair = workUnitAsyncStorage - .run( + const streamPair = teeStream( + workUnitAsyncStorage.run( requestStore, renderToFlightStream, ComponentMod, @@ -3498,7 +3502,7 @@ async function renderWithRestartOnCacheMissInDev( signal: initialReactController.signal, } ) - .tee() + ) // If we abort the render, we want to reject the stage-dependent promises as well. // Note that we want to install this listener after the render is started @@ -3516,7 +3520,11 @@ async function renderWithRestartOnCacheMissInDev( initialDataController.signal.addEventListener('abort', () => { accumulatedChunksPromise.catch(() => {}) - stream.cancel() + if (stream instanceof ReadableStream) { + stream.cancel() + } else { + stream.destroy() + } }) return { @@ -3632,8 +3640,8 @@ async function renderWithRestartOnCacheMissInDev( finalStageController.advanceStage(RenderStage.EarlyStatic) startTime = performance.now() + performance.timeOrigin - const streamPair = workUnitAsyncStorage - .run( + const streamPair = teeStream( + workUnitAsyncStorage.run( requestStore, renderToFlightStream, ComponentMod, @@ -3647,7 +3655,7 @@ async function renderWithRestartOnCacheMissInDev( debugChannel: debugChannel?.serverSide, } ) - .tee() + ) return { stream: streamPair[0], @@ -3699,71 +3707,116 @@ interface AccumulatedStreamChunks { } async function accumulateStreamChunks( - stream: ReadableStream, + stream: AnyStream, stageController: StagedRenderingController, signal: AbortSignal | null ): Promise { const staticChunks: Array = [] const runtimeChunks: Array = [] const dynamicChunks: Array = [] - const reader = stream.getReader() - let cancelled = false - function cancel() { - if (!cancelled) { - cancelled = true - reader.cancel() + if (stream instanceof ReadableStream) { + const reader = stream.getReader() + + let cancelled = false + function cancel() { + if (!cancelled) { + cancelled = true + reader.cancel() + } } - } - if (signal) { - signal.addEventListener('abort', cancel, { once: true }) - } + if (signal) { + signal.addEventListener('abort', cancel, { once: true }) + } - try { - while (!cancelled) { - const { done, value } = await reader.read() - if (done) { - cancel() - break - } - switch (stageController.currentStage) { - case RenderStage.Before: - throw new InvariantError( - 'Unexpected stream chunk while in Before stage' - ) - case RenderStage.EarlyStatic: - case RenderStage.Static: - staticChunks.push(value) - // fall through - case RenderStage.EarlyRuntime: - case RenderStage.Runtime: - runtimeChunks.push(value) - // fall through - case RenderStage.Dynamic: - dynamicChunks.push(value) - break - case RenderStage.Abandoned: - // If the render was abandoned, we won't use the chunks, - // so there's no need to accumulate them - break - default: - stageController.currentStage satisfies never + try { + while (!cancelled) { + const { done, value } = await reader.read() + if (done) { + cancel() break + } + accumulateChunk( + stageController, + staticChunks, + runtimeChunks, + dynamicChunks, + value + ) + } + } catch (err) { + // When we cancel the reader we may reject the read. + // Only swallow errors caused by our intentional cancel(); + // re-throw unexpected errors to avoid silently returning partial data. + if (!cancelled) { + throw err } } - } catch (err) { - // When we cancel the reader we may reject the read. - // Only swallow errors caused by our intentional cancel(); - // re-throw unexpected errors to avoid silently returning partial data. - if (!cancelled) { - throw err + } else { + const nodeStream = stream as Readable + let cancelled = false + function cancel() { + if (!cancelled) { + cancelled = true + nodeStream.destroy() + } + } + + if (signal) { + signal.addEventListener('abort', cancel, { once: true }) + } + + try { + for await (const value of nodeStream) { + if (cancelled) break + accumulateChunk( + stageController, + staticChunks, + runtimeChunks, + dynamicChunks, + value + ) + } + } catch (err) { + if (!cancelled) { + throw err + } } } return { staticChunks, runtimeChunks, dynamicChunks } } +function accumulateChunk( + stageController: StagedRenderingController, + staticChunks: Array, + runtimeChunks: Array, + dynamicChunks: Array, + value: Uint8Array +): void { + switch (stageController.currentStage) { + case RenderStage.Before: + throw new InvariantError('Unexpected stream chunk while in Before stage') + case RenderStage.EarlyStatic: + case RenderStage.Static: + staticChunks.push(value) + // fall through + case RenderStage.EarlyRuntime: + case RenderStage.Runtime: + runtimeChunks.push(value) + // fall through + case RenderStage.Dynamic: + dynamicChunks.push(value) + break + case RenderStage.Abandoned: + break + default: + stageController.currentStage satisfies never + break + } +} + async function countStaticStageBytes( stream: ReadableStream, stageController: StagedRenderingController @@ -3982,7 +4035,7 @@ async function spawnStaticShellValidationInDevImpl( ctx: AppRenderContext, requestStore: RequestStore, fallbackRouteParams: OpaqueFallbackRouteParams | null, - debugChannelClient: Readable | undefined + debugChannelClient: AnyStream | undefined ): Promise { const debug = process.env.NEXT_PRIVATE_DEBUG_VALIDATION === '1' ? console.log : undefined @@ -4018,9 +4071,11 @@ async function spawnStaticShellValidationInDevImpl( let debugChunks: Uint8Array[] | null = null if (debugChannelClient) { debugChunks = [] - debugChannelClient.on('data', (c) => { - debugChunks!.push(c) - }) + ;(async () => { + for await (const c of debugChannelClient) { + debugChunks.push(c) + } + })() } const accumulatedChunks = await accumulatedChunksPromise @@ -4475,6 +4530,8 @@ async function validateInstantConfigs( payload: initialRscPayload, stageEndTimes, } = await collectStagedSegmentData( + ctx.componentMod, + renderToFlightStream, { [RenderStage.Static]: accumulatedChunks.staticChunks, [RenderStage.Runtime]: accumulatedChunks.runtimeChunks, @@ -4547,6 +4604,8 @@ async function validateInstantConfigs( const { stream: serverStream, debugStream } = await createCombinedPayloadStream( + ctx.componentMod, + renderToFlightStream, payloadResult.payload, extraChunksController, reactController.signal, @@ -4727,7 +4786,7 @@ async function validateInstantConfigs( } type PrerenderToStreamResult = { - stream: ReadableStream + stream: AnyStream digestErrorsMap: Map ssrErrors: Array dynamicAccess?: null | Array @@ -5568,12 +5627,14 @@ async function prerenderToStream( ) } - let htmlStream: ReadableStream = prelude + let htmlStream: AnyStream = prelude if (postponed != null) { // We postponed but nothing dynamic was used. We resume the render now and immediately abort it // so we can set all the postponed boundaries to client render mode before we store the HTML response const foreverStream = createPendingStream() - const resumePrelude = await resumeAndAbort( + const resumePrelude = await workUnitAsyncStorage.run( + finalServerPrerenderStore, + resumeAndAbort, // eslint-disable-next-line @next/internal/no-ambiguous-jsx = prelude + let htmlStream: AnyStream = prelude if (postponed != null) { // We postponed but nothing dynamic was used. We resume the render now and immediately abort it // so we can set all the postponed boundaries to client render mode before we store the HTML response @@ -6311,30 +6372,3 @@ function WarnForBypassCachesInDev({ route }: { route: string }) { ) return null } - -function nodeStreamFromReadableStream(stream: ReadableStream) { - if (process.env.NEXT_RUNTIME === 'edge') { - throw new InvariantError( - 'nodeStreamFromReadableStream cannot be used in the edge runtime' - ) - } else { - const reader = stream.getReader() - - const { Readable } = require('node:stream') as typeof import('node:stream') - - return new Readable({ - read() { - reader - .read() - .then(({ done, value }) => { - if (done) { - this.push(null) - } else { - this.push(value) - } - }) - .catch((err) => this.destroy(err)) - }, - }) - } -} diff --git a/packages/next/src/server/app-render/debug-channel-server.node.ts b/packages/next/src/server/app-render/debug-channel-server.node.ts new file mode 100644 index 0000000000000..b96372e10e006 --- /dev/null +++ b/packages/next/src/server/app-render/debug-channel-server.node.ts @@ -0,0 +1,37 @@ +/** + * Node debug channel implementation. + * Loaded by debug-channel-server.ts when __NEXT_USE_NODE_STREAMS is enabled. + */ + +import { PassThrough, Writable } from 'node:stream' +import type { DebugChannelPair } from './debug-channel-server.web' + +export function createDebugChannel(): DebugChannelPair | undefined { + if (process.env.NODE_ENV === 'production') { + return undefined + } + return createNodeDebugChannel() +} + +function createNodeDebugChannel(): DebugChannelPair { + const readable = new PassThrough() + + // Use a plain Writable instead of exposing the PassThrough directly. + // React's renderToPipeableStream detects .read() on the debugChannel and + // enters bidirectional mode, reading its own output back as commands. + const writable = new Writable({ + write(chunk, _encoding, callback) { + readable.push(chunk) + callback() + }, + final(callback) { + readable.push(null) + callback() + }, + }) + + return { + serverSide: writable, + clientSide: { readable }, + } +} diff --git a/packages/next/src/server/app-render/debug-channel-server.ts b/packages/next/src/server/app-render/debug-channel-server.ts index 3d122179a36bf..2d36fd08dffe6 100644 --- a/packages/next/src/server/app-render/debug-channel-server.ts +++ b/packages/next/src/server/app-render/debug-channel-server.ts @@ -1,15 +1,25 @@ /** * Compile-time switcher for debug channel operations. * - * Simple re-export from the web implementation. - * A future change will add a conditional branch for node streams. + * When __NEXT_USE_NODE_STREAMS is true, uses a Node PassThrough-based channel. + * Otherwise, uses web WritableStream APIs. */ export type { DebugChannelPair, DebugChannelServer, } from './debug-channel-server.web' -export { - createDebugChannel, - toNodeDebugChannel, -} from './debug-channel-server.web' +type DebugChannelMod = { + createDebugChannel: typeof import('./debug-channel-server.web').createDebugChannel +} + +let _m: DebugChannelMod +if (process.env.__NEXT_USE_NODE_STREAMS) { + _m = + require('./debug-channel-server.node') as typeof import('./debug-channel-server.node') +} else { + _m = + require('./debug-channel-server.web') as typeof import('./debug-channel-server.web') +} + +export const createDebugChannel = _m.createDebugChannel diff --git a/packages/next/src/server/app-render/debug-channel-server.web.ts b/packages/next/src/server/app-render/debug-channel-server.web.ts index 7e2ede83d0036..4d115fe4ed0b8 100644 --- a/packages/next/src/server/app-render/debug-channel-server.web.ts +++ b/packages/next/src/server/app-render/debug-channel-server.web.ts @@ -3,20 +3,20 @@ * Loaded by debug-channel-server.ts. */ -// Types defined inline for now; will move to debug-channel-server.node.ts later. +import type { AnyStream } from './app-render-prerender-utils' + export type DebugChannelPair = { serverSide: DebugChannelServer clientSide: DebugChannelClient } -export type DebugChannelServer = { - readable?: ReadableStream - writable: WritableStream -} +// Opaque: PassThrough on node, { writable: WritableStream } on web. +// Each React render API handles its own variant. + +export type DebugChannelServer = any type DebugChannelClient = { - readable: ReadableStream - writable?: WritableStream + readable: AnyStream } export function createDebugChannel(): DebugChannelPair | undefined { @@ -52,15 +52,3 @@ export function createWebDebugChannel(): DebugChannelPair { clientSide: { readable: clientSideReadable }, } } - -/** - * toNodeDebugChannel is a no-op stub on the web path. - * It should never be called in edge/web builds. - */ -export function toNodeDebugChannel( - _webDebugChannel: DebugChannelServer -): never { - throw new Error( - 'toNodeDebugChannel cannot be used in edge/web runtime, this is a bug in the Next.js codebase' - ) -} diff --git a/packages/next/src/server/app-render/flight-render-result.ts b/packages/next/src/server/app-render/flight-render-result.ts index 005c618294e8b..36eb8495a210b 100644 --- a/packages/next/src/server/app-render/flight-render-result.ts +++ b/packages/next/src/server/app-render/flight-render-result.ts @@ -1,12 +1,13 @@ import { RSC_CONTENT_TYPE_HEADER } from '../../client/components/app-router-headers' import RenderResult, { type RenderResultMetadata } from '../render-result' +import type { AnyStream } from './stream-ops' /** * Flight Response is always set to RSC_CONTENT_TYPE_HEADER to ensure it does not get interpreted as HTML. */ export class FlightRenderResult extends RenderResult { constructor( - response: string | ReadableStream, + response: string | AnyStream, metadata: RenderResultMetadata = {}, waitUntil?: Promise ) { diff --git a/packages/next/src/server/app-render/get-layer-assets.tsx b/packages/next/src/server/app-render/get-layer-assets.tsx index 40d57c81ced65..af5557e72d5e1 100644 --- a/packages/next/src/server/app-render/get-layer-assets.tsx +++ b/packages/next/src/server/app-render/get-layer-assets.tsx @@ -87,6 +87,7 @@ export function getLayerAssets({ async: true, key: `script-${index}`, nonce: ctx.nonce, + crossOrigin: ctx.renderOpts.crossOrigin, }) }) : [] diff --git a/packages/next/src/server/app-render/instant-validation/instant-validation.tsx b/packages/next/src/server/app-render/instant-validation/instant-validation.tsx index b4c547a02d811..2dce9a9ebc54a 100644 --- a/packages/next/src/server/app-render/instant-validation/instant-validation.tsx +++ b/packages/next/src/server/app-render/instant-validation/instant-validation.tsx @@ -37,10 +37,10 @@ import { createNodeStreamFromChunks, } from './stream-utils' import { createDebugChannel } from '../debug-channel-server' +import type { FlightComponentMod } from '../stream-ops' + // eslint-disable-next-line import/no-extraneous-dependencies import { createFromNodeStream } from 'react-server-dom-webpack/client' -// eslint-disable-next-line import/no-extraneous-dependencies -import { renderToReadableStream } from 'react-server-dom-webpack/server' import { addSearchParamsIfPageSegment, isGroupSegment, @@ -198,7 +198,16 @@ export type StageEndTimes = { * Splits an existing staged stream (represented as arrays of chunks) * into separate staged streams (also in arrays-of-chunks form), one for each segment. * */ +type RenderToFlightStream = ( + ComponentMod: FlightComponentMod, + payload: any, + clientModules: any, + opts: any +) => AsyncIterable + export async function collectStagedSegmentData( + ComponentMod: FlightComponentMod, + renderFlightStream: RenderToFlightStream, fullPageChunks: StageChunks, fullPageDebugChunks: Uint8Array[] | null, startTime: number, @@ -286,7 +295,8 @@ export async function collectStagedSegmentData( ? createDebugChannel() : undefined - const itemStream = renderToReadableStream( + const itemStream = renderFlightStream( + ComponentMod, data, clientReferenceManifest.clientModules, { @@ -319,14 +329,14 @@ export async function collectStagedSegmentData( await Promise.all([ // accumulate Flight chunks (async () => { - for await (const chunk of itemStream.values()) { + for await (const chunk of itemStream) { writeChunk(cacheEntry.chunks, controller.currentStage, chunk) } })(), // accumulate Debug chunks segmentDebugChannel && (async () => { - for await (const chunk of segmentDebugChannel.clientSide.readable.values()) { + for await (const chunk of segmentDebugChannel.clientSide.readable) { cacheEntry.debugChunks!.push(chunk) } })(), @@ -494,6 +504,8 @@ function writeChunk( * to provide extra debug info. * */ export async function createCombinedPayloadStream( + ComponentMod: FlightComponentMod, + renderFlightStream: RenderToFlightStream, payload: InitialRSCPayload, extraChunksAbortController: AbortController, renderSignal: AbortSignal, @@ -514,7 +526,8 @@ export async function createCombinedPayloadStream( await runInSequentialTasks( () => { - const stream = renderToReadableStream( + const stream = renderFlightStream( + ComponentMod, payload, clientReferenceManifest.clientModules, { @@ -546,7 +559,7 @@ export async function createCombinedPayloadStream( streamFinished = Promise.all([ // Accumulate Flight chunks (async () => { - for await (const chunk of stream.values()) { + for await (const chunk of stream) { allChunks.push(chunk) if (isRenderable) { renderableChunks.push(chunk) @@ -556,7 +569,7 @@ export async function createCombinedPayloadStream( // Accumulate debug chunks debugChannel && (async () => { - for await (const chunk of debugChannel.clientSide.readable.values()) { + for await (const chunk of debugChannel.clientSide.readable) { debugChunks!.push(chunk) } })(), diff --git a/packages/next/src/server/app-render/stream-ops.node.ts b/packages/next/src/server/app-render/stream-ops.node.ts new file mode 100644 index 0000000000000..4d1cd800f2580 --- /dev/null +++ b/packages/next/src/server/app-render/stream-ops.node.ts @@ -0,0 +1,446 @@ +/** + * Node.js stream operations for the rendering pipeline. + * Loaded by stream-ops.ts when process.env.__NEXT_USE_NODE_STREAMS is true. + * + * AnyStream = AnyStreamType so the exported type surface matches stream-ops.web.ts, + * allowing the switcher to assign either module without casts. + * Rendering uses pipeable APIs; continue functions wrap the existing web + * transforms via Readable.fromWeb() on their output. + */ + +import type { PostponedState, PrerenderOptions } from 'react-dom/static' +import { + renderToPipeableStream, + resumeToPipeableStream, +} from 'react-dom/server' +import { prerender } from 'react-dom/static' +import { PassThrough, Readable } from 'node:stream' + +import type { ReactDOMServerReadableStream } from 'react-dom/server' +import { + continueFizzStream as webContinueFizzStream, + continueStaticPrerender as webContinueStaticPrerender, + continueDynamicPrerender as webContinueDynamicPrerender, + continueStaticFallbackPrerender as webContinueStaticFallbackPrerender, + continueDynamicHTMLResume as webContinueDynamicHTMLResume, + streamToBuffer as webStreamToBuffer, + streamToString as webStreamToString, + createDocumentClosingStream as webCreateDocumentClosingStream, + createRuntimePrefetchTransformStream, +} from '../stream-utils/node-web-streams-helper' +import { createInlinedDataReadableStream } from './use-flight-response' +import type { AnyStream as AnyStreamType } from './app-render-prerender-utils' +import { DetachedPromise } from '../../lib/detached-promise' +import { getTracer } from '../lib/trace/tracer' +import { AppRenderSpan } from '../lib/trace/constants' + +// --------------------------------------------------------------------------- +// Re-export shared types from the web module +// --------------------------------------------------------------------------- + +export type { + ContinueStreamSharedOptions, + ContinueFizzStreamOptions, + ContinueStaticPrerenderOptions, + ContinueDynamicHTMLResumeOptions, + ServerPrerenderComponentMod, + FlightPayload, + FlightClientModules, + FlightRenderOptions, +} from './stream-ops.web' + +// --------------------------------------------------------------------------- +// AnyStream matches stream-ops.web.ts so both modules have the same type surface +// --------------------------------------------------------------------------- + +export type AnyStream = AnyStreamType + +export type FlightComponentMod = { + renderToReadableStream: ( + model: any, + webpackMap: any, + options?: any + ) => ReadableStream + renderToPipeableStream?: ( + model: any, + webpackMap: any, + options?: any + ) => { + pipe( + destination: Writable + ): Writable + abort(reason?: unknown): void + } +} + +export type FizzStreamResult = { + stream: AnyStream + allReady: Promise + abort?: (reason?: unknown) => void +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +type WebReadableStream = import('stream/web').ReadableStream + +function nodeReadableToWebReadableStream( + stream: Readable | ReadableStream +): ReadableStream { + if (stream instanceof ReadableStream) { + return stream + } + // Readable.toWeb returns stream/web ReadableStream which is structurally + // identical to the global ReadableStream. + return Readable.toWeb(stream) as unknown as ReadableStream +} + +function webToReadable( + stream: ReadableStream | Readable +): Readable { + if (stream instanceof Readable) { + return stream + } + return Readable.fromWeb(stream as WebReadableStream) +} + +// --------------------------------------------------------------------------- +// Rendering functions (output Node Readable natively via PassThrough) +// --------------------------------------------------------------------------- + +export function renderToFlightStream( + ComponentMod: FlightComponentMod, + payload: any, + clientModules: any, + opts: any, + runInContext?: (fn: () => T) => T +): AnyStream { + const run: (fn: () => T) => T = runInContext ?? ((fn) => fn()) + + if (ComponentMod.renderToPipeableStream) { + const pt = new PassThrough() + const pipeable = run(() => + ComponentMod.renderToPipeableStream!(payload, clientModules, opts) + ) + pipeable.pipe(pt) + return pt + } + + // Fallback: use web API and convert + const webStream = run(() => + ComponentMod.renderToReadableStream(payload, clientModules, opts) + ) + return webToReadable(webStream) +} + +export async function renderToFizzStream( + element: React.ReactElement, + streamOptions: any, + runInContext?: (fn: () => T) => T +): Promise { + const run: (fn: () => T) => T = runInContext ?? ((fn) => fn()) + + const pt = new PassThrough() + const shellReady = new DetachedPromise() + const allReady = new DetachedPromise() + + // Node.js renderToPipeableStream passes a plain object to onHeaders, + // but callers expect a web Headers instance. + const originalOnHeaders = streamOptions?.onHeaders + const wrappedOnHeaders = originalOnHeaders + ? (headers: Record) => { + originalOnHeaders(new Headers(headers)) + } + : undefined + + const pipeable = run(() => + getTracer().trace(AppRenderSpan.renderToReadableStream, () => + renderToPipeableStream(element, { + ...streamOptions, + onHeaders: wrappedOnHeaders, + onShellReady() { + streamOptions?.onShellReady?.() + pipeable.pipe(pt) + shellReady.resolve() + }, + onShellError(error: unknown) { + streamOptions?.onShellError?.(error) + shellReady.reject(error) + }, + onAllReady() { + streamOptions?.onAllReady?.() + allReady.resolve() + }, + onError: streamOptions?.onError, + }) + ) + ) + + await shellReady.promise + + return { + stream: pt, + allReady: allReady.promise, + abort: (reason?: unknown) => pipeable.abort(reason), + } +} + +export async function resumeToFizzStream( + element: React.ReactElement, + postponedState: PostponedState, + streamOptions: any, + runInContext?: (fn: () => T) => T +): Promise { + const run: (fn: () => T) => T = runInContext ?? ((fn) => fn()) + + const pt = new PassThrough() + const allReady = new DetachedPromise() + + const pipeable = await run(() => + resumeToPipeableStream(element, postponedState, { + ...streamOptions, + onAllReady() { + streamOptions?.onAllReady?.() + allReady.resolve() + }, + }) + ) + pipeable.pipe(pt) + + return { + stream: pt, + allReady: allReady.promise, + abort: (reason?: unknown) => pipeable.abort(reason), + } +} + +export async function resumeAndAbort( + element: React.ReactElement, + postponed: PostponedState | null, + opts: any +): Promise { + const pt = new PassThrough() + const pipeable = await resumeToPipeableStream( + element, + postponed as PostponedState, + opts + ) + pipeable.pipe(pt) + pipeable.abort(opts?.signal?.reason) + return pt +} + +// --------------------------------------------------------------------------- +// Continue function wrappers +// Bridge Node Readable → web, apply existing web transforms, Readable.fromWeb() +// --------------------------------------------------------------------------- + +export async function continueFizzStream( + renderStream: AnyStream, + opts: import('./stream-ops.web').ContinueFizzStreamOptions +): Promise { + const webOpts = { + ...opts, + inlinedDataStream: opts.inlinedDataStream + ? nodeReadableToWebReadableStream(opts.inlinedDataStream) + : undefined, + } + // The web continueFizzStream reads renderStream.allReady from the stream + // object itself (ReactDOMServerReadableStream). A plain ReadableStream from + // readableToWeb() won't have that property, so we attach it from opts. + const webStream = nodeReadableToWebReadableStream(renderStream) + const fizzLike = Object.assign(webStream, { + allReady: opts.allReady ?? Promise.resolve(), + }) as ReactDOMServerReadableStream + const webResult = await webContinueFizzStream(fizzLike, webOpts) + return webToReadable(webResult) +} + +export async function continueStaticPrerender( + prerenderStream: AnyStream, + opts: import('./stream-ops.web').ContinueStaticPrerenderOptions +): Promise { + const webResult = await webContinueStaticPrerender( + nodeReadableToWebReadableStream(prerenderStream), + { + ...opts, + inlinedDataStream: nodeReadableToWebReadableStream( + opts.inlinedDataStream + ), + } + ) + return webToReadable(webResult) +} + +export async function continueDynamicPrerender( + prerenderStream: AnyStream, + opts: { + getServerInsertedHTML: () => Promise + getServerInsertedMetadata: () => Promise + deploymentId: string | undefined + } +): Promise { + const webResult = await webContinueDynamicPrerender( + nodeReadableToWebReadableStream(prerenderStream), + opts + ) + return webToReadable(webResult) +} + +export async function continueStaticFallbackPrerender( + prerenderStream: AnyStream, + opts: import('./stream-ops.web').ContinueStaticPrerenderOptions +): Promise { + const webResult = await webContinueStaticFallbackPrerender( + nodeReadableToWebReadableStream(prerenderStream), + { + ...opts, + inlinedDataStream: nodeReadableToWebReadableStream( + opts.inlinedDataStream + ), + } + ) + return webToReadable(webResult) +} + +export async function continueDynamicHTMLResume( + renderStream: AnyStream, + opts: import('./stream-ops.web').ContinueDynamicHTMLResumeOptions +): Promise { + const webResult = await webContinueDynamicHTMLResume( + nodeReadableToWebReadableStream(renderStream), + { + ...opts, + inlinedDataStream: nodeReadableToWebReadableStream( + opts.inlinedDataStream + ), + } + ) + return webToReadable(webResult) +} + +// --------------------------------------------------------------------------- +// Utility functions (Node-native) +// --------------------------------------------------------------------------- + +export function chainStreams(...streams: AnyStream[]): AnyStream { + if (streams.length === 0) { + const pt = new PassThrough() + pt.end() + return pt + } + + if (streams.length === 1) { + return streams[0] + } + + const out = new PassThrough() + let i = 0 + + function pipeNext() { + if (i >= streams.length) { + out.end() + return + } + const current = webToReadable(streams[i++]) + current.pipe(out, { end: false }) + current.on('end', pipeNext) + current.on('error', (err) => out.destroy(err)) + } + + pipeNext() + return out +} + +export async function streamToBuffer(stream: AnyStream): Promise { + return webStreamToBuffer(nodeReadableToWebReadableStream(stream)) +} + +export async function streamToUint8Array( + stream: AnyStream +): Promise { + const chunks: Buffer[] = [] + for await (const chunk of webToReadable(stream)) { + chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk) + } + return Buffer.concat(chunks) +} + +export async function streamToString(stream: AnyStream): Promise { + return webStreamToString(nodeReadableToWebReadableStream(stream)) +} + +export function createInlinedDataStream( + source: AnyStream, + nonce: string | undefined, + formState: unknown | null +): AnyStream { + const webSource = nodeReadableToWebReadableStream(source) + const webResult = createInlinedDataReadableStream(webSource, nonce, formState) + return webToReadable(webResult) +} + +export function createPendingStream(): AnyStream { + return new PassThrough() +} + +export function createDocumentClosingStream(): AnyStream { + const webStream = webCreateDocumentClosingStream() + return webToReadable(webStream) +} + +export function createOnHeadersCallback( + appendHeader: (key: string, value: string) => void +): NonNullable { + return (headers: Headers) => { + headers.forEach((value, key) => { + appendHeader(key, value) + }) + } +} + +export function pipeRuntimePrefetchTransform( + stream: AnyStream, + sentinel: number, + isPartial: boolean, + staleTime: number +): AnyStream { + const webStream = nodeReadableToWebReadableStream(stream) + const transformed = webStream.pipeThrough( + createRuntimePrefetchTransformStream(sentinel, isPartial, staleTime) + ) + return webToReadable(transformed) +} + +// --------------------------------------------------------------------------- +// Re-exports (no stream involvement, identical to web) +// --------------------------------------------------------------------------- + +export async function processPrelude(unprocessedPrelude: AnyStream) { + const [prelude, peek] = + nodeReadableToWebReadableStream(unprocessedPrelude).tee() + + const reader = peek.getReader() + const firstResult = await reader.read() + reader.cancel() + + return { + prelude: webToReadable(prelude) as AnyStream, + preludeIsEmpty: firstResult.done === true, + } +} + +export function getServerPrerender(ComponentMod: { + prerender: (...args: any[]) => Promise +}): (...args: any[]) => any { + return ComponentMod.prerender +} + +export const getClientPrerender: typeof import('react-dom/static').prerender = + prerender + +export function teeStream(stream: AnyStream): [AnyStream, AnyStream] { + const [s1, s2] = nodeReadableToWebReadableStream(stream).tee() + return [webToReadable(s1), webToReadable(s2)] +} diff --git a/packages/next/src/server/app-render/stream-ops.ts b/packages/next/src/server/app-render/stream-ops.ts index c703556f440c7..ae0614ba2e9c8 100644 --- a/packages/next/src/server/app-render/stream-ops.ts +++ b/packages/next/src/server/app-render/stream-ops.ts @@ -1,8 +1,11 @@ /** * Compile-time switcher for stream operations. * - * PR2: Simple re-export from the web implementation. - * A future change will add a conditional branch for node streams. + * When __NEXT_USE_NODE_STREAMS is true, uses Node.js pipeable stream APIs. + * Otherwise, uses web ReadableStream APIs. + * + * Both modules export AnyStream = AnyStreamType so their type surfaces are + * structurally identical — no `as unknown as` cast is needed. */ export type { AnyStream, @@ -18,26 +21,35 @@ export type { FizzStreamResult, } from './stream-ops.web' -export { - continueFizzStream, - continueStaticPrerender, - continueDynamicPrerender, - continueStaticFallbackPrerender, - continueDynamicHTMLResume, - streamToBuffer, - chainStreams, - createDocumentClosingStream, - processPrelude, - nodeReadableToWeb, - createInlinedDataStream, - createPendingStream, - createOnHeadersCallback, - resumeAndAbort, - renderToFlightStream, - streamToString, - renderToFizzStream, - resumeToFizzStream, - getServerPrerender, - getClientPrerender, - pipeRuntimePrefetchTransform, -} from './stream-ops.web' +type WebMod = typeof import('./stream-ops.web') + +let _m: WebMod +if (process.env.__NEXT_USE_NODE_STREAMS) { + _m = require('./stream-ops.node') as typeof import('./stream-ops.node') +} else { + _m = require('./stream-ops.web') as typeof import('./stream-ops.web') +} + +export const continueFizzStream = _m.continueFizzStream +export const continueStaticPrerender = _m.continueStaticPrerender +export const continueDynamicPrerender = _m.continueDynamicPrerender +export const continueStaticFallbackPrerender = + _m.continueStaticFallbackPrerender +export const continueDynamicHTMLResume = _m.continueDynamicHTMLResume +export const streamToBuffer = _m.streamToBuffer +export const chainStreams = _m.chainStreams +export const createDocumentClosingStream = _m.createDocumentClosingStream +export const processPrelude = _m.processPrelude +export const createInlinedDataStream = _m.createInlinedDataStream +export const createPendingStream = _m.createPendingStream +export const createOnHeadersCallback = _m.createOnHeadersCallback +export const resumeAndAbort = _m.resumeAndAbort +export const renderToFlightStream = _m.renderToFlightStream +export const streamToString = _m.streamToString +export const streamToUint8Array = _m.streamToUint8Array +export const renderToFizzStream = _m.renderToFizzStream +export const resumeToFizzStream = _m.resumeToFizzStream +export const getServerPrerender = _m.getServerPrerender +export const getClientPrerender = _m.getClientPrerender +export const pipeRuntimePrefetchTransform = _m.pipeRuntimePrefetchTransform +export const teeStream = _m.teeStream diff --git a/packages/next/src/server/app-render/stream-ops.web.ts b/packages/next/src/server/app-render/stream-ops.web.ts index 509e5143b297c..b9107c4a35bed 100644 --- a/packages/next/src/server/app-render/stream-ops.web.ts +++ b/packages/next/src/server/app-render/stream-ops.web.ts @@ -1,6 +1,9 @@ /** * Web stream operations for the rendering pipeline. - * Loaded by stream-ops.ts (re-export in this PR, conditional switcher later). + * Loaded by stream-ops.ts when __NEXT_USE_NODE_STREAMS is false (default). + * + * AnyStream = AnyStreamType so the exported type surface matches stream-ops.node.ts, + * allowing the switcher to assign either module without `as unknown as`. */ import type { PostponedState, PrerenderOptions } from 'react-dom/static' @@ -12,11 +15,21 @@ import { streamToString as webStreamToString, createRuntimePrefetchTransformStream, continueFizzStream as webContinueFizzStream, + continueStaticPrerender as webContinueStaticPrerender, + continueDynamicPrerender as webContinueDynamicPrerender, + continueStaticFallbackPrerender as webContinueStaticFallbackPrerender, + continueDynamicHTMLResume as webContinueDynamicHTMLResume, + streamToBuffer as webStreamToBuffer, + streamToUint8Array as webStreamToUint8Array, + chainStreams as webChainStreams, + createDocumentClosingStream as webCreateDocumentClosingStream, } from '../stream-utils/node-web-streams-helper' import { createInlinedDataReadableStream } from './use-flight-response' +import { processPrelude as webProcessPrelude } from './app-render-prerender-utils' +import type { AnyStream as AnyStreamType } from './app-render-prerender-utils' // --------------------------------------------------------------------------- -// Shared types (web-only for now; will move to stream-ops.node.ts later) +// Shared types // --------------------------------------------------------------------------- type FlightRenderToReadableStream = ( @@ -25,7 +38,7 @@ type FlightRenderToReadableStream = ( options?: any ) => ReadableStream -export type AnyStream = ReadableStream +export type AnyStream = AnyStreamType export type ContinueStreamSharedOptions = { deploymentId: string | undefined @@ -69,37 +82,102 @@ export type FizzStreamResult = { } // --------------------------------------------------------------------------- -// Continue functions +// Continue function wrappers +// Thin wrappers that accept AnyStream and narrow to +// ReadableStream internally for the web helper functions. // --------------------------------------------------------------------------- -export { - continueStaticPrerender, - continueDynamicPrerender, - continueStaticFallbackPrerender, - continueDynamicHTMLResume, - streamToBuffer, - chainStreams, - createDocumentClosingStream, -} from '../stream-utils/node-web-streams-helper' - -export { processPrelude } from './app-render-prerender-utils' - -/** - * Wrapper for continueFizzStream that accepts AnyStream. - * The underlying implementation expects ReactDOMServerReadableStream but at - * the stream-ops boundary we only expose AnyStream. - */ export function continueFizzStream( renderStream: AnyStream, opts: ContinueFizzStreamOptions -): Promise> { - return webContinueFizzStream(renderStream as any, opts) +): Promise { + return webContinueFizzStream( + renderStream as ReadableStream as any, + { + ...opts, + inlinedDataStream: opts.inlinedDataStream as + | ReadableStream + | undefined, + } + ) } -// Not available in web bundles -export const nodeReadableToWeb: - | ((readable: import('node:stream').Readable) => ReadableStream) - | undefined = undefined +export async function continueStaticPrerender( + prerenderStream: AnyStream, + opts: ContinueStaticPrerenderOptions +): Promise { + return webContinueStaticPrerender( + prerenderStream as ReadableStream, + { + ...opts, + inlinedDataStream: opts.inlinedDataStream as ReadableStream, + } + ) +} + +export async function continueDynamicPrerender( + prerenderStream: AnyStream, + opts: { + getServerInsertedHTML: () => Promise + getServerInsertedMetadata: () => Promise + deploymentId: string | undefined + } +): Promise { + return webContinueDynamicPrerender( + prerenderStream as ReadableStream, + opts + ) +} + +export async function continueStaticFallbackPrerender( + prerenderStream: AnyStream, + opts: ContinueStaticPrerenderOptions +): Promise { + return webContinueStaticFallbackPrerender( + prerenderStream as ReadableStream, + { + ...opts, + inlinedDataStream: opts.inlinedDataStream as ReadableStream, + } + ) +} + +export async function continueDynamicHTMLResume( + renderStream: AnyStream, + opts: ContinueDynamicHTMLResumeOptions +): Promise { + return webContinueDynamicHTMLResume( + renderStream as ReadableStream, + { + ...opts, + inlinedDataStream: opts.inlinedDataStream as ReadableStream, + } + ) +} + +export async function streamToBuffer(stream: AnyStream): Promise { + return webStreamToBuffer(stream as ReadableStream) +} + +export async function streamToUint8Array( + stream: AnyStream +): Promise { + return webStreamToUint8Array(stream as ReadableStream) +} + +export function chainStreams(...streams: AnyStream[]): AnyStream { + return webChainStreams(...(streams as ReadableStream[])) +} + +export function createDocumentClosingStream(): AnyStream { + return webCreateDocumentClosingStream() +} + +export async function processPrelude( + unprocessedPrelude: AnyStream +): Promise<{ prelude: AnyStream; preludeIsEmpty: boolean }> { + return webProcessPrelude(unprocessedPrelude as ReadableStream) +} // --------------------------------------------------------------------------- // Composed helpers @@ -196,3 +274,7 @@ export function pipeRuntimePrefetchTransform( createRuntimePrefetchTransformStream(sentinel, isPartial, staleTime) ) } + +export function teeStream(stream: AnyStream): [AnyStream, AnyStream] { + return (stream as ReadableStream).tee() +} diff --git a/packages/next/src/server/app-render/types.ts b/packages/next/src/server/app-render/types.ts index 6ba05906d68a5..ebf62c386b41f 100644 --- a/packages/next/src/server/app-render/types.ts +++ b/packages/next/src/server/app-render/types.ts @@ -19,6 +19,7 @@ import type { BaseNextRequest } from '../base-http' import type { IncomingMessage } from 'http' import type { RenderResumeDataCache } from '../resume-data-cache/resume-data-cache' import type { ServerCacheStatus } from '../../next-devtools/dev-overlay/cache-indicator' +import type { AnyStream } from './stream-ops' const dynamicParamTypesSchema = s.enums([ 'c', @@ -113,12 +114,12 @@ export interface RenderOptsPartial { setCacheStatus?: (status: ServerCacheStatus, htmlRequestId: string) => void setIsrStatus?: (key: string, value: boolean | undefined) => void setReactDebugChannel?: ( - debugChannel: { readable: ReadableStream }, + debugChannel: { readable: AnyStream }, htmlRequestId: string, requestId: string ) => void sendErrorsToBrowser?: ( - errorsRscStream: ReadableStream, + errorsRscStream: AnyStream, htmlRequestId: string ) => void isBuildTimePrerendering?: boolean diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index 72ce6216e021e..991c82031c89d 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -76,9 +76,17 @@ export function getFlightStream( const { Readable } = require('node:stream') as typeof import('node:stream') - // The types of flightStream and debugStream should match. - if (debugStream && !(debugStream instanceof Readable)) { - throw new InvariantError('Expected debug stream to be a Readable') + // Convert debug stream to Readable if it's a ReadableStream. + // When __NEXT_USE_NODE_STREAMS is enabled, the debug channel produces + // Node Readables natively. Otherwise, it produces web ReadableStreams. + let nodeDebugStream: Readable | undefined + if (debugStream) { + if (debugStream instanceof Readable) { + nodeDebugStream = debugStream + } else { + type WebReadableStream = import('stream/web').ReadableStream + nodeDebugStream = Readable.fromWeb(debugStream as WebReadableStream) + } } // react-server-dom-webpack/client.edge must not be hoisted for require cache clearing to work correctly @@ -96,7 +104,7 @@ export function getFlightStream( { findSourceMapURL, nonce, - debugChannel: debugStream, + debugChannel: nodeDebugStream, endTime: debugEndTime, } ) diff --git a/packages/next/src/server/dev/debug-channel.ts b/packages/next/src/server/dev/debug-channel.ts index a0a052b8426a1..61510afb2970b 100644 --- a/packages/next/src/server/dev/debug-channel.ts +++ b/packages/next/src/server/dev/debug-channel.ts @@ -1,12 +1,22 @@ +import type { Readable } from 'node:stream' import { createBufferedTransformStream } from '../stream-utils/node-web-streams-helper' import { HMR_MESSAGE_SENT_TO_BROWSER, type HmrMessageSentToBrowser, } from './hot-reloader-types' +import type { AnyStream } from '../app-render/stream-ops' + +function toWebReadableStream(stream: AnyStream): ReadableStream { + if (stream instanceof ReadableStream) { + return stream + } + const { Readable: ReadableClass } = + require('node:stream') as typeof import('node:stream') + return ReadableClass.toWeb(stream as Readable) as ReadableStream +} export interface ReactDebugChannelForBrowser { - readonly readable: ReadableStream - // Might also get a writable stream as return channel in the future. + readonly readable: AnyStream } const reactDebugChannelsByHtmlRequestId = new Map< @@ -19,7 +29,7 @@ export function connectReactDebugChannel( debugChannel: ReactDebugChannelForBrowser, sendToClient: (message: HmrMessageSentToBrowser) => void ) { - const reader = debugChannel.readable + const reader = toWebReadableStream(debugChannel.readable) .pipeThrough( // We're sending the chunks in batches to reduce overhead in the browser. createBufferedTransformStream({ maxBufferByteLength: 128 * 1024 }) diff --git a/packages/next/src/server/dev/hot-reloader-types.ts b/packages/next/src/server/dev/hot-reloader-types.ts index 20250260079e8..c620de4a707bb 100644 --- a/packages/next/src/server/dev/hot-reloader-types.ts +++ b/packages/next/src/server/dev/hot-reloader-types.ts @@ -14,6 +14,7 @@ import type { } from '../../next-devtools/dev-overlay/cache-indicator' import type { DevToolsConfig } from '../../next-devtools/dev-overlay/shared' import type { ReactDebugChannelForBrowser } from './debug-channel' +import type { AnyStream } from '../app-render/stream-ops' export const enum HMR_MESSAGE_SENT_TO_BROWSER { // JSON messages: @@ -242,10 +243,7 @@ export interface NextJsHotReloaderInterface { htmlRequestId: string, requestId: string ): void - sendErrorsToBrowser( - errorsRscStream: ReadableStream, - htmlRequestId: string - ): void + sendErrorsToBrowser(errorsRscStream: AnyStream, htmlRequestId: string): void getCompilationErrors(page: string): Promise onHMR( req: IncomingMessage, diff --git a/packages/next/src/server/dev/hot-reloader-webpack.ts b/packages/next/src/server/dev/hot-reloader-webpack.ts index e67450ecca4c0..d4f7a5f0e2afe 100644 --- a/packages/next/src/server/dev/hot-reloader-webpack.ts +++ b/packages/next/src/server/dev/hot-reloader-webpack.ts @@ -5,6 +5,7 @@ import type { Telemetry } from '../../telemetry/storage' import type { IncomingMessage, ServerResponse } from 'http' import type { UrlObject } from 'url' import type { RouteDefinition } from '../route-definitions/route-definition' +import type { AnyStream } from '../app-render/stream-ops' import { type webpack, StringXor } from 'next/dist/compiled/webpack/webpack' import { @@ -1820,7 +1821,7 @@ export default class HotReloaderWebpack implements NextJsHotReloaderInterface { } public sendErrorsToBrowser( - errorsRscStream: ReadableStream, + errorsRscStream: AnyStream, htmlRequestId: string ): void { const client = this.webpackHotMiddleware?.getClient(htmlRequestId) diff --git a/packages/next/src/server/dev/serialized-errors.ts b/packages/next/src/server/dev/serialized-errors.ts index 41774a52ced6b..64f8c32b66a69 100644 --- a/packages/next/src/server/dev/serialized-errors.ts +++ b/packages/next/src/server/dev/serialized-errors.ts @@ -1,16 +1,14 @@ -import { streamToUint8Array } from '../stream-utils/node-web-streams-helper' import { HMR_MESSAGE_SENT_TO_BROWSER, type HmrMessageSentToBrowser, } from './hot-reloader-types' +import type { AnyStream } from '../app-render/stream-ops' +import { streamToUint8Array } from '../app-render/stream-ops' -const errorsRscStreamsByHtmlRequestId = new Map< - string, - ReadableStream ->() +const errorsRscStreamsByHtmlRequestId = new Map() export function sendSerializedErrorsToClient( - errorsRscStream: ReadableStream, + errorsRscStream: AnyStream, sendToClient: (message: HmrMessageSentToBrowser) => void ) { streamToUint8Array(errorsRscStream).then( @@ -43,7 +41,7 @@ export function sendSerializedErrorsToClientForHtmlRequest( export function setErrorsRscStreamForHtmlRequest( htmlRequestId: string, - errorsRscStream: ReadableStream + errorsRscStream: AnyStream ) { // TODO: Clean up after a timeout, in case the client never connects, e.g. // when CURL'ing the page, or loading the page with JavaScript disabled etc. diff --git a/packages/next/src/server/lib/router-utils/router-server-context.ts b/packages/next/src/server/lib/router-utils/router-server-context.ts index fef818d13f746..ca50434541f25 100644 --- a/packages/next/src/server/lib/router-utils/router-server-context.ts +++ b/packages/next/src/server/lib/router-utils/router-server-context.ts @@ -2,6 +2,7 @@ import type { IncomingMessage, ServerResponse } from 'node:http' import type { NextConfigRuntime } from '../../config-shared' import type { UrlWithParsedQuery } from 'node:url' import type { ServerCacheStatus } from '../../../next-devtools/dev-overlay/cache-indicator' +import type { AnyStream } from '../../app-render/stream-ops' export type RevalidateFn = (config: { urlPath: string @@ -40,13 +41,13 @@ export type RouterServerContext = Record< // allow setting ISR status in dev setIsrStatus?: (key: string, value: boolean | undefined) => void setReactDebugChannel?: ( - debugChannel: { readable: ReadableStream }, + debugChannel: { readable: AnyStream }, htmlRequestId: string, requestId: string ) => void setCacheStatus?: (status: ServerCacheStatus, htmlRequestId: string) => void sendErrorsToBrowser?: ( - errorsRscStream: ReadableStream, + errorsRscStream: AnyStream, htmlRequestId: string ) => void // indicates request handlers are already wrapped by next-server tracing diff --git a/packages/next/src/server/pipe-readable.ts b/packages/next/src/server/pipe-readable.ts index 2722c0c474cb8..b4c1939fb9460 100644 --- a/packages/next/src/server/pipe-readable.ts +++ b/packages/next/src/server/pipe-readable.ts @@ -1,4 +1,5 @@ import type { ServerResponse } from 'node:http' +import type { Readable } from 'node:stream' import { ResponseAbortedName, @@ -144,3 +145,97 @@ export async function pipeToNodeResponse( throw new Error('failed to pipe response', { cause: err }) } } + +export async function pipeNodeReadableToNodeResponse( + readable: Readable, + res: ServerResponse, + waitUntilForEnd?: Promise +) { + try { + const { errored, destroyed } = res + if (errored || destroyed) return + + let started = false + + const finished = new DetachedPromise() + + res.once('close', () => { + readable.destroy() + finished.resolve() + }) + + readable.on('data', (chunk: Buffer) => { + if (!started) { + started = true + + if ( + 'performance' in globalThis && + process.env.NEXT_OTEL_PERFORMANCE_PREFIX + ) { + const metrics = getClientComponentLoaderMetrics() + if (metrics) { + performance.measure( + `${process.env.NEXT_OTEL_PERFORMANCE_PREFIX}:next-client-component-loading`, + { + start: metrics.clientComponentLoadStart, + end: + metrics.clientComponentLoadStart + + metrics.clientComponentLoadTimes, + } + ) + } + } + + res.flushHeaders() + getTracer().trace( + NextNodeServerSpan.startResponse, + { + spanName: 'start response', + }, + () => undefined + ) + } + + const ok = res.write(chunk) + + if ('flush' in res && typeof res.flush === 'function') { + res.flush() + } + + if (!ok) { + readable.pause() + res.once('drain', () => { + readable.resume() + }) + } + }) + + readable.on('end', async () => { + if (waitUntilForEnd) { + await waitUntilForEnd + } + + if (!res.writableFinished) { + res.end() + } + + finished.resolve() + }) + + readable.on('error', (err) => { + if (isAbortError(err)) { + finished.resolve() + return + } + + res.destroy(err) + finished.resolve() + }) + + await finished.promise + } catch (err: any) { + if (isAbortError(err)) return + + throw new Error('failed to pipe response', { cause: err }) + } +} diff --git a/packages/next/src/server/render-result.ts b/packages/next/src/server/render-result.ts index 53220cfb7831d..81447c56d3db0 100644 --- a/packages/next/src/server/render-result.ts +++ b/packages/next/src/server/render-result.ts @@ -1,4 +1,5 @@ import type { OutgoingHttpHeaders, ServerResponse } from 'http' +import type { Readable } from 'stream' import type { CacheControl } from './lib/cache-control' import type { FetchMetrics } from './base-http' @@ -8,7 +9,11 @@ import { streamFromString, streamToString, } from './stream-utils/node-web-streams-helper' -import { isAbortError, pipeToNodeResponse } from './pipe-readable' +import { + isAbortError, + pipeToNodeResponse, + pipeNodeReadableToNodeResponse, +} from './pipe-readable' import type { RenderResumeDataCache } from './resume-data-cache/resume-data-cache' import { InvariantError } from '../shared/lib/invariant-error' import type { @@ -74,6 +79,7 @@ export type RenderResultMetadata = AppPageRenderResultMetadata & export type RenderResultResponse = | ReadableStream[] | ReadableStream + | Readable | string | Buffer | null @@ -86,6 +92,16 @@ export type RenderResultOptions< metadata: Metadata } +function isNodeReadable(value: unknown): value is Readable { + return ( + value !== null && + typeof value === 'object' && + typeof (value as Record).pipe === 'function' && + typeof (value as Record).on === 'function' && + !(value instanceof ReadableStream) + ) +} + export default class RenderResult< Metadata extends RenderResultMetadata = RenderResultMetadata, > { @@ -223,6 +239,19 @@ export default class RenderResult< return chainStreams(...this.response) } + if (isNodeReadable(this.response)) { + let Readable: typeof import('node:stream').Readable + if (process.env.TURBOPACK) { + Readable = (require('node:stream') as typeof import('node:stream')) + .Readable + } else { + Readable = ( + __non_webpack_require__('node:stream') as typeof import('node:stream') + ).Readable + } + return Readable.toWeb(this.response) as ReadableStream + } + return this.response } @@ -245,6 +274,17 @@ export default class RenderResult< return this.response } else if (Buffer.isBuffer(this.response)) { return [streamFromBuffer(this.response)] + } else if (isNodeReadable(this.response)) { + let Readable: typeof import('node:stream').Readable + if (process.env.TURBOPACK) { + Readable = (require('node:stream') as typeof import('node:stream')) + .Readable + } else { + Readable = ( + __non_webpack_require__('node:stream') as typeof import('node:stream') + ).Readable + } + return [Readable.toWeb(this.response) as ReadableStream] } else { return [this.response] } @@ -341,6 +381,16 @@ export default class RenderResult< * @param res */ public async pipeToNodeResponse(res: ServerResponse) { + if ( + this.response !== null && + typeof this.response !== 'string' && + !Buffer.isBuffer(this.response) && + !Array.isArray(this.response) && + isNodeReadable(this.response) + ) { + await pipeNodeReadableToNodeResponse(this.response, res, this.waitUntil) + return + } await pipeToNodeResponse(this.readable, res, this.waitUntil) } }