Spaces:
Runtime error
Runtime error
import { EventSourceParserStream } from 'eventsource-parser/stream'; | |
import type { ParsedEvent } from 'eventsource-parser'; | |
type TextStreamUpdate = { | |
done: boolean; | |
value: string; | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any | |
citations?: any; | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any | |
error?: any; | |
usage?: ResponseUsage; | |
}; | |
type ResponseUsage = { | |
/** Including images and tools if any */ | |
prompt_tokens: number; | |
/** The tokens generated */ | |
completion_tokens: number; | |
/** Sum of the above two fields */ | |
total_tokens: number; | |
}; | |
// createOpenAITextStream takes a responseBody with a SSE response, | |
// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks | |
export async function createOpenAITextStream( | |
responseBody: ReadableStream<Uint8Array>, | |
splitLargeDeltas: boolean | |
): Promise<AsyncGenerator<TextStreamUpdate>> { | |
const eventStream = responseBody | |
.pipeThrough(new TextDecoderStream()) | |
.pipeThrough(new EventSourceParserStream()) | |
.getReader(); | |
let iterator = openAIStreamToIterator(eventStream); | |
if (splitLargeDeltas) { | |
iterator = streamLargeDeltasAsRandomChunks(iterator); | |
} | |
return iterator; | |
} | |
async function* openAIStreamToIterator( | |
reader: ReadableStreamDefaultReader<ParsedEvent> | |
): AsyncGenerator<TextStreamUpdate> { | |
while (true) { | |
const { value, done } = await reader.read(); | |
if (done) { | |
yield { done: true, value: '' }; | |
break; | |
} | |
if (!value) { | |
continue; | |
} | |
const data = value.data; | |
if (data.startsWith('[DONE]')) { | |
yield { done: true, value: '' }; | |
break; | |
} | |
try { | |
const parsedData = JSON.parse(data); | |
console.log(parsedData); | |
if (parsedData.error) { | |
yield { done: true, value: '', error: parsedData.error }; | |
break; | |
} | |
if (parsedData.citations) { | |
yield { done: false, value: '', citations: parsedData.citations }; | |
continue; | |
} | |
yield { | |
done: false, | |
value: parsedData.choices?.[0]?.delta?.content ?? '', | |
usage: parsedData.usage | |
}; | |
} catch (e) { | |
console.error('Error extracting delta from SSE event:', e); | |
} | |
} | |
} | |
// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters | |
// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once | |
async function* streamLargeDeltasAsRandomChunks( | |
iterator: AsyncGenerator<TextStreamUpdate> | |
): AsyncGenerator<TextStreamUpdate> { | |
for await (const textStreamUpdate of iterator) { | |
if (textStreamUpdate.done) { | |
yield textStreamUpdate; | |
return; | |
} | |
if (textStreamUpdate.citations) { | |
yield textStreamUpdate; | |
continue; | |
} | |
let content = textStreamUpdate.value; | |
if (content.length < 5) { | |
yield { done: false, value: content }; | |
continue; | |
} | |
while (content != '') { | |
const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); | |
const chunk = content.slice(0, chunkSize); | |
yield { done: false, value: chunk }; | |
// Do not sleep if the tab is hidden | |
// Timers are throttled to 1s in hidden tabs | |
if (document?.visibilityState !== 'hidden') { | |
await sleep(5); | |
} | |
content = content.slice(chunkSize); | |
} | |
} | |
} | |
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); | |