import { MonitoringAPI } from '@/app/api';
import { useAxios } from '@vue-composable/axios';
import { computed, ref } from '@vue/composition-api';
import dayjs from 'dayjs';
import { ApolloAPI, KafkaAPI } from '../api';
import { KafkaProcessingOptions, RetrievalType, TaskStatus } from '../constants';
import { HarvesterBlockId, HarvesterBlockIds, PreprocessingBlockId } from '../constants/apollo-block.constants';
import { WorkflowStatus } from '../constants/apollo-pipeline.constants';
import {
    ApiHarvesterConfiguration,
    ApolloTask,
    ApolloTaskConfiguration,
    BigQueryHarvesterConfiguration,
    FileHarvesterConfiguration,
    KafkaHarvesterConfiguration,
    MappingConfiguration,
    SQLHarvesterConfiguration,
} from '../types';
import { ApolloPipeline } from '../types/apollo-pipeline.type';

export function useApolloPipeline(pipelineId: string, onFetchErrorCallback?: Function, fetchOnLoad = true) {
    const { exec, error, loading } = useAxios(true);
    const pipeline = ref<ApolloPipeline>();

    const fetchPipeline = async () => {
        try {
            const res = await exec(ApolloAPI.get(pipelineId));
            pipeline.value = res?.data;
        } catch (e: any) {
            if (onFetchErrorCallback instanceof Function) onFetchErrorCallback(e);
        }
    };

    const harvester = computed(() => {
        return pipeline.value?.tasks.find((task) => HarvesterBlockIds.includes(task.blockId as any));
    });

    const mapping = computed(() => {
        return pipeline.value?.tasks.find((task) => task.blockId === PreprocessingBlockId.Mapping) as
            | ApolloTask<MappingConfiguration>
            | undefined;
    });

    const loader = computed(() => {
        return pipeline.value?.tasks.find((task) => task.blockId === PreprocessingBlockId.Loader);
    });

    const isFinalized = computed(() => pipeline.value?.status === WorkflowStatus.Ready);
    const isSuspended = computed(() => pipeline.value?.status === WorkflowStatus.Suspended);
    const isUnderRevise = computed(
        () => pipeline.value?.status === WorkflowStatus.Updating && loader.value?.status === TaskStatus.Ready,
    );
    const isStreaming = computed(() =>
        [
            HarvesterBlockId.Kafka,
            HarvesterBlockId.ExternalKafka,
            HarvesterBlockId.MQTT,
            HarvesterBlockId.ExternalMQTT,
        ].includes(harvester.value?.blockId as HarvesterBlockId),
    );
    const isLargeFilesHarvester = computed(() => harvester.value?.blockId === HarvesterBlockId.LargeFiles);

    const isOnPremise = computed(() => !!pipeline.value?.runnerId);

    const isLongRunning = computed(
        () =>
            harvester.value &&
            ([HarvesterBlockId.MQTT, HarvesterBlockId.ExternalMQTT].includes(
                harvester.value.blockId as HarvesterBlockId,
            ) ||
                ([HarvesterBlockId.Kafka, HarvesterBlockId.ExternalKafka].includes(
                    harvester.value.blockId as HarvesterBlockId,
                ) &&
                    (harvester.value.configuration as KafkaHarvesterConfiguration).processing ===
                        KafkaProcessingOptions.RealTime)),
    );

    const expiredRetrieveUntilDate = computed(() => {
        const inclusiveDate = dayjs((harvester.value?.configuration as any)?.retrieval?.endDate).add(1, 'day'); // add 1 extra day in order to make it inclusive
        return isStreaming.value && inclusiveDate.isBefore(dayjs().utc());
    });

    const hasAnonymisation = computed(
        () => !!pipeline.value?.tasks.find((task) => task.blockId === PreprocessingBlockId.Anonymisation),
    );

    const resetSuspended = async (resetKafka: boolean) => {
        if (expiredRetrieveUntilDate.value) {
            error.value.message = 'Retrieve Until Date is in the past. Please update it accordingly to continue.';
        }
        await exec(MonitoringAPI.resetExecutions(pipelineId));

        if (resetKafka) {
            const { topic, groupId } = (harvester.value
                ?.configuration as KafkaHarvesterConfiguration)?.connectionDetails;
            await exec(KafkaAPI.resetTopic(topic!, groupId!));
        }
    };

    const isFileHarvester = (config: ApolloTaskConfiguration): config is FileHarvesterConfiguration =>
        harvester.value?.blockId === HarvesterBlockId.File;
    const isAPIHarvester = (config: ApolloTaskConfiguration): config is ApiHarvesterConfiguration =>
        harvester.value?.blockId === HarvesterBlockId.Api;

    const isDatabaseHarvester = (
        config: ApolloTaskConfiguration,
    ): config is BigQueryHarvesterConfiguration | SQLHarvesterConfiguration =>
        harvester.value
            ? [HarvesterBlockId.BigQuery, HarvesterBlockId.SQL].includes(harvester.value.blockId as any)
            : false;

    const isPeriodicOrPolling = () =>
        ![RetrievalType.Once, RetrievalType.Immediately].includes(
            (harvester.value?.configuration as
                | ApiHarvesterConfiguration
                | BigQueryHarvesterConfiguration
                | SQLHarvesterConfiguration).retrieval.type as RetrievalType,
        );

    if (fetchOnLoad) fetchPipeline();

    return {
        pipeline,
        loading,
        harvester,
        mapping,
        loader,
        isFinalized,
        isSuspended,
        isUnderRevise,
        isStreaming,
        isOnPremise,
        isLongRunning,
        isLargeFilesHarvester,
        expiredRetrieveUntilDate,
        error,
        hasAnonymisation,
        isFileHarvester,
        isAPIHarvester,
        isDatabaseHarvester,
        resetSuspended,
        fetchPipeline,
        isPeriodicOrPolling,
    };
}
