/**
 * Here so mqtt lib works on the browser
 * @see {https://github.com/mqttjs/MQTT.js/issues/1474}
 * @see {https://github.com/mqttjs/MQTT.js/pull/1497}
 */
import 'core-js/modules/web.immediate'

import { connect, type ISubscriptionGrant, type MqttClient, validations } from 'mqtt'

import { BulkEvent, delay, Droppable, isArrayNotEmpty, Queue } from '../../../utils'

import { isTopicSubscriptionResponseSuccessful, type TopicSubscriptionResponse } from '../../../domain'

import type { TopFrameWrapper } from '../../workers'
import { TopicMatchingHelper } from '..'
import type { MqttMessageFromWorker, MqttMessageToWorker, SerializableMqttTopicMessageEvent } from './Types'
import { parsePayload } from './PayloadParser'

const sharedTopicPattern = /^\$share\/[^/]+\/[^/]+$/

type ActionType<T extends MqttMessageToWorker | MqttMessageFromWorker, A extends string> = T extends { action: A } ? Omit<T, 'action'> : never

/**
 * @todo wait for validation code to merged into the master
 * @see {https://github.com/mqttjs/MQTT.js/blob/main/lib/validations.js#L12}
 * @see {https://github.com/mqttjs/MQTT.js/pull/1498}
 */
class Validator {
    constructor (private readonly maxDepth: number) {}

    isValid (topic: string): boolean {
        const parts = topic.split('/')

        if (parts.length > this.maxDepth) {
            /**
             * This topic is too deep
             */
            return false
        }

        return validations.validateTopicParts(parts)
    }
}

class ConnectedClient {
    private readonly droppable = new Droppable()

    private readonly subscriptionQueue = new Queue()

    private readonly client: Pick<MqttClient, 'on' | 'once' | 'off' | 'subscribe' | 'unsubscribe' | 'removeAllListeners' | 'end'>

    private readonly timeout: number

    private readonly cycle: number

    private readonly validator: Validator

    private previousTopics: string[] = []

    constructor ({ url, username, password, timeout, cycle, maxTopicDepth }: ActionType<MqttMessageToWorker, 'connect'>) {
        this.client = connect(url, { username, password, connectTimeout: timeout })
        this.timeout = timeout
        this.cycle = cycle
        this.validator = new Validator(maxTopicDepth)
    }

    private promisifedSubscribe (topics: string[]): Promise<[error: Error | null, granted: ISubscriptionGrant[]]> {
        return new Promise((resolve) => this.client.subscribe(topics, (error, granted) => resolve([error, granted])))
    }

    onError (handler: (e: Error) => void): void {
        this.client.on('error', handler)
    }

    onLoad (handler: (connected: boolean) => void): void {
        void Promise.race([
            /** Connection promise */
            new Promise<boolean>((resolve) => this.client.once('connect', () => resolve(true))),
            /** Time-out promise */
            delay(this.timeout).then(() => false),
        ]).then(handler)
    }

    onReconnect (handler: VoidFunction): void {
        this.client.on('reconnect', () => {
            const dropListeners: VoidFunction = () => {
                // Failed to reconnect (So give up and drop everything)
                this.client.off('close', dropListeners)
                this.client.off('connect', handleReconnection)
            }

            const handleReconnection: VoidFunction = () => {
                // Make sure nothing else gets fired
                dropListeners()
                // Finally signal reconnection was made
                handler()
            }

            this.client.once('close', dropListeners)
            this.client.once('connect', handleReconnection)
        })
    }

    onBulk (handler: (batch: [topics: string, payload: Buffer][]) => void): void {
        const bulkEvent = new BulkEvent<[topics: string, payload: Buffer]>(this.cycle, true)

        /** Listen to messages once connected */
        this.client.on('message', (topic, payload) => bulkEvent.append([topic, payload]))

        /** Every cycle, send this up the chain */
        this.droppable.onDrop(bulkEvent.onCycled((batch) => handler(batch)))
    }

    disconnect (): void {
        this.client.removeAllListeners()
        this.client.end()
        this.droppable.drop()
    }

    subscribe (topics: string[]): Promise<TopicSubscriptionResponse> {
        /** Wait for the last subscription flow to be done and then start the next one */
        return this.subscriptionQueue.push(async () => {
            const { previousTopics } = this

            /** Unsubscribe from previous topics if there are any */
            if (isArrayNotEmpty(previousTopics)) {
                this.client.unsubscribe(previousTopics)
            }

            const validTopics: typeof topics = []
            const response: TopicSubscriptionResponse = { subscribed: [], noPermissions: [], invalid: [], unknown: [], shared: [] }

            /** Strip out topics that are either malformed or too deep */
            topics.forEach((topic) => {
                if (!this.validator.isValid(topic)) {
                    response.invalid.push(topic)
                } else if (sharedTopicPattern.test(topic)) {
                    response.shared.push(topic)
                } else {
                    validTopics.push(topic)
                }
            })

            /** Subscribe to new topics if there are any and validate subscription was successful */
            if (isArrayNotEmpty(validTopics)) {
                const [error, granted] = await this.promisifedSubscribe(validTopics)

                if (error) {
                    /** Error is not expected, so it is handled a resubscribable error */
                    response.unknown.push(...validTopics)
                } else {
                    granted.forEach(({ qos, topic }) => {
                        if (qos === 128) {
                            /** This handles lack of authorization when attempting to subscribe to a topic */
                            response.noPermissions.push(topic)
                        } else {
                            /** Topic was properly subscribed */
                            response.subscribed.push(topic)
                        }
                    })
                }
            }

            this.previousTopics = response.subscribed

            return response
        })
    }
}

/**
 * This webworker is here in order to prevent
 * the massive amount of mqtt topic messages
 * to overwhelm the UI event loop
 *
 * It will collect whatever messages it receives in an ammount of time
 * And trigger one single event with all collected messages
 *
 * It depends on two things
 *  - The internal connection to mqtt
 *  - The parcel plugin that deals with wrapping this code to be put inside the web worker
 */
export class MqttWorker {
    private readonly helper = new TopicMatchingHelper()

    private client: ConnectedClient | null = null

    private topics: string[] = []

    constructor (private readonly worker: TopFrameWrapper<MqttMessageFromWorker, MqttMessageToWorker>) {}

    private withClient (withClient: (client: ConnectedClient) => Promise<void> | void, withoutClient: VoidFunction = this.sendStateError): void {
        if (this.client) {
            void withClient(this.client)
        } else {
            withoutClient()
        }
    }

    private send (message: MqttMessageFromWorker): void {
        this.withClient(() => this.worker.send(message))
    }

    private readonly sendStateError = (): void => {
        this.worker.send({ action: 'error', type: 'state', message: "Worker's state is not properly setup" })
    }

    private connect (args: ActionType<MqttMessageToWorker, 'connect'>): void {
        this.withClient(this.sendStateError, () => {
            const client = (this.client = new ConnectedClient(args))

            /** Handle errors */
            client.onError((e) => {
                this.send({ action: 'error', type: 'genericError', message: e.message })
                this.disconnect()
            })

            /** Wait for load */
            client.onLoad((connected) => {
                /** Make sure connection is made with-in the accepted timeframe */
                if (!connected) {
                    this.send({ action: 'error', type: 'couldNotConnect', message: 'Could not connect to the mqtt client' })
                    return
                }

                client.onBulk((batch) => {
                    const topicalBatch = batch.reduce<SerializableMqttTopicMessageEvent[]>((r, [topic, payload]) => {
                        if (!isArrayNotEmpty(this.topics)) {
                            return r
                        }

                        const sources = this.helper.findTopicSources(topic, this.topics)
                        const [serializedPayload, possibleTypes] = parsePayload(payload)

                        if (isArrayNotEmpty(sources)) {
                            r.push({ topic, possibleTypes, payload: serializedPayload, timestamp: this.getCurrentTime(), sources })
                        }

                        return r
                    }, [])

                    if (isArrayNotEmpty(topicalBatch)) {
                        this.send({ action: 'messages', batch: topicalBatch, topics: this.topics })
                    }
                })
            })

            /** Detect reconnections, and re-subscribe to all topics */
            client.onReconnect(() => {
                this.applySubscription(this.topics, (response) => {
                    if (!isTopicSubscriptionResponseSuccessful(response)) {
                        /** Had issues resubscribing, should never occur in real life scenarios, but this covers all basis */
                        this.send({ action: 'error', type: 'couldNotReconnect', message: 'There was an issue completing reconnection' })
                    }
                })
            })
        })
    }

    private disconnect (): void {
        this.withClient((client) => client.disconnect())
        this.client = null
    }

    /**
     * Actually performs the subscription, handles exceptions and updates internal controls
     */
    private applySubscription (topics: string[], handler: (response: TopicSubscriptionResponse) => void): void {
        this.withClient(async (client) => {
            const response = await client.subscribe(topics)
            this.topics = response.subscribed
            handler(response)
        })
    }

    /**
     * Subscribes and signals top frame what subscription was actually made
     */
    private subscribe ({ id, topics }: ActionType<MqttMessageToWorker, 'subscribe'>): void {
        this.applySubscription(topics, (response) => this.send({ id, action: 'subscription', response }))
    }

    /** This is exposed to enable tests mocks */
    // eslint-disable-next-line class-methods-use-this
    protected getCurrentTime (): Date {
        return new Date()
    }

    start (): void {
        this.worker.listen((data) => {
            switch (data.action) {
                case 'connect':
                    this.connect(data)
                    break
                case 'disconnect':
                    this.disconnect()
                    break
                case 'subscribe':
                    this.subscribe(data)
                    break
            }
        })
    }
}
