/*
 * Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.
 */

export type ServerEvent<T> = {
  data?: T | undefined;
  event?: string | undefined;
  id?: string | undefined;
  retry?: number | undefined;
};

export class EventStream<T extends ServerEvent<unknown>>
  extends ReadableStream<T>
{
  constructor(
    responseBody: ReadableStream<Uint8Array>,
    parse: (x: ServerEvent<string>) => IteratorResult<T, undefined>,
  ) {
    const upstream = responseBody.getReader();
    let buffer: Uint8Array = new Uint8Array();
    super({
      async pull(downstream) {
        try {
          while (true) {
            const match = findBoundary(buffer);
            if (!match) {
              const chunk = await upstream.read();
              if (chunk.done) return downstream.close();
              buffer = concatBuffer(buffer, chunk.value);
              continue;
            }
            const message = buffer.slice(0, match.index);
            buffer = buffer.slice(match.index + match.length);
            const item = parseMessage(message, parse);
            if (item?.value) return downstream.enqueue(item.value);
            if (item?.done) {
              await upstream.cancel("done");
              return downstream.close();
            }
          }
        } catch (e) {
          downstream.error(e);
          await upstream.cancel(e);
        }
      },
      cancel: reason => upstream.cancel(reason),
    });
  }

  // Polyfill for older browsers
  [Symbol.asyncIterator](): AsyncIterableIterator<T> {
    const fn = (ReadableStream.prototype as any)[Symbol.asyncIterator];
    if (typeof fn === "function") return fn.call(this);
    const reader = this.getReader();
    return {
      next: async () => {
        const r = await reader.read();
        if (r.done) {
          reader.releaseLock();
          return { done: true, value: undefined };
        }
        return { done: false, value: r.value };
      },
      throw: async (e) => {
        await reader.cancel(e);
        reader.releaseLock();
        return { done: true, value: undefined };
      },
      return: async () => {
        await reader.cancel("done");
        reader.releaseLock();
        return { done: true, value: undefined };
      },
      [Symbol.asyncIterator]() {
        return this;
      },
    };
  }
}

function concatBuffer(a: Uint8Array, b: Uint8Array): Uint8Array {
  const c = new Uint8Array(a.length + b.length);
  c.set(a, 0);
  c.set(b, a.length);
  return c;
}

/** Finds the first (CR,LF,CR,LF) or (CR,CR) or (LF,LF) */
function findBoundary(
  buf: Uint8Array,
): { index: number; length: number } | null {
  const len = buf.length;
  for (let i = 0; i < len; i++) {
    if (
      i <= len - 4
      && buf[i] === 13 && buf[i + 1] === 10 && buf[i + 2] === 13
      && buf[i + 3] === 10
    ) {
      return { index: i, length: 4 };
    }
    if (i <= len - 2 && buf[i] === 13 && buf[i + 1] === 13) {
      return { index: i, length: 2 };
    }
    if (i <= len - 2 && buf[i] === 10 && buf[i + 1] === 10) {
      return { index: i, length: 2 };
    }
  }
  return null;
}

function parseMessage<T extends ServerEvent<unknown>>(
  chunk: Uint8Array,
  parse: (x: ServerEvent<string>) => IteratorResult<T, undefined>,
) {
  const text = new TextDecoder().decode(chunk);
  const lines = text.split(/\r\n|\r|\n/);
  const dataLines: string[] = [];
  const ret: ServerEvent<string> = {};
  let ignore = true;
  for (const line of lines) {
    if (!line || line.startsWith(":")) continue;
    ignore = false;
    const i = line.indexOf(":");
    const field = line.slice(0, i);
    const value = line[i + 1] === " " ? line.slice(i + 2) : line.slice(i + 1);
    if (field === "data") dataLines.push(value);
    else if (field === "event") ret.event = value;
    else if (field === "id") ret.id = value;
    else if (field === "retry") {
      const n = Number(value);
      if (!isNaN(n)) ret.retry = n;
    }
  }
  if (ignore) return;
  if (dataLines.length) ret.data = dataLines.join("\n");
  return parse(ret);
}
