import { Inject, Injectable } from '@angular/core';
import { WINDOW } from '../../app/injection-tokens';
import { Observable, Subject } from 'rxjs';
import { switchMap } from 'rxjs/operators';

/**
 * The FileReaderService reads files from the local file system with controlled concurrency
 */
@Injectable()
export class FileReaderService {
    constructor(@Inject(WINDOW) private window: Window) {}

    /**
     * Reads a file progressively, in chunks of a given size, concurrencyLimit chunks at the same time.
     * After each chunk is read, it will execute callbackProgress passing the data just read.
     * When concurrencyLimit chunks are being read at the same time, it will wait for one callback to complete before reading another chunk.
     * Finally, when all chunks are read and all callbacks are finished, the callbackFinal will be executed.
     * @param file File to be read
     * @param chunkSize Size of the chunks
     * @param concurrencyLimit How many chunks can be read and processed (executed its progress callback) at the same time
     * @param callbackProgress Callback to be executed after each chunk is read (receives read data as param)
     * @param callbackFinal Final callback executed once after all chunks are read
     */
    readFileInChunksConcurrently(
        file: File,
        chunkSize: number,
        concurrencyLimit: number,
        callbackProgress: (data: ArrayBuffer) => Observable<void>,
        callbackFinal: () => void,
    ): void {
        // since reading the file and executing the callbackProgress are not naturally synchronized,
        // we must sync manually using two pointers:
        let nextOffsetToRead = 0;
        let nextOffsetToProcess = 0;

        const totalNumberOfChunks = Math.ceil(file.size / chunkSize);
        let chunksProcessed = 0; // how many chunks have been read and processed (aka the callbackProgress for it has finished)

        /**
         * Reads one chunk and calls the callbackProcess with the read data.
         * Since simultaneous reads can finish in any order, this method checks whether previous chunks have been
         * processed before calling the callback, and waits for its turn.
         */
        const readAndProcessOneChunk = (): Observable<void> => {
            const _chunkReady = new Subject<ArrayBuffer>();
            const partialViewOfFile = file.slice(nextOffsetToRead, nextOffsetToRead + chunkSize);
            const readerOffset = nextOffsetToRead;
            const reader = new FileReader();

            reader.onload = (): void => {
                /**
                 * Emits _chunkReady with the reader result only after previous chunks have been processed
                 */
                const tryToProcessChunk = (): void => {
                    if (nextOffsetToProcess === readerOffset) {
                        nextOffsetToProcess = readerOffset + chunkSize;
                        _chunkReady.next(reader.result as ArrayBuffer);
                        _chunkReady.complete();
                    } else {
                        this.window.setTimeout(tryToProcessChunk, 1000);
                    }
                };
                tryToProcessChunk();
            };

            reader.readAsArrayBuffer(partialViewOfFile);
            nextOffsetToRead += chunkSize;

            return _chunkReady.asObservable().pipe(switchMap(callbackProgress));
        };

        /**
         * Determines whether or not another chunk should be read, and whether or not to execute the final callback.
         */
        const afterChunkProcessed = () => {
            chunksProcessed++;
            if (nextOffsetToRead <= file.size) {
                readAndProcessOneChunk().subscribe(afterChunkProcessed);
                return;
            }
            if (chunksProcessed === totalNumberOfChunks) {
                callbackFinal();
            }
        };

        // trigger initial simultaneous reads
        const initialReads = Math.min(concurrencyLimit, totalNumberOfChunks);
        for (let i = 0; i < initialReads; i++) {
            readAndProcessOneChunk().subscribe(afterChunkProcessed);
        }
    }
}
