/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.util.Text;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperConfig;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.impl.subscriber.CommandPoller;
import org.apache.sling.distribution.journal.impl.subscriber.IdleCheck;
import org.apache.sling.distribution.journal.impl.subscriber.NoopIdle;
import org.apache.sling.distribution.journal.impl.subscriber.PreConditionTimeoutException;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberConfiguration;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberIdleCheck;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberReady;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberReadyStore;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.OffsetMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.shared.Delay;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.util.converter.Converters;
import org.osgi.util.converter.Converting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service={}, immediate=true, property={"announceDelay=10000"}, configurationPid={"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory"})
@Designate(ocd=SubscriberConfiguration.class, factory=true)
public class DistributionSubscriber {
    private static final long PRECONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60L);
    static long RETRY_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(5L);
    static long MAX_RETRY_DELAY_MILLIS = TimeUnit.MINUTES.toMillis(15L);
    static long QUEUE_FETCH_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(1L);
    private static final Supplier<LongSupplier> catchAllDelays = () -> Delay.exponential(RETRY_DELAY_MILLIS, MAX_RETRY_DELAY_MILLIS);
    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
    @Reference(name="packageBuilder")
    private DistributionPackageBuilder packageBuilder;
    @Reference
    private SlingSettingsService slingSettings;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference(name="precondition")
    private Precondition precondition;
    @Reference
    private MetricsService metricsService;
    @Reference
    BookKeeperFactory bookKeeperFactory;
    @Reference
    private SubscriberReadyStore subscriberReadyStore;
    private SubscriberMetrics subscriberMetrics;
    private volatile Closeable idleReadyCheck;
    private volatile IdleCheck idleCheck;
    private Closeable packagePoller;
    private volatile CommandPoller commandPoller;
    private BookKeeper bookKeeper;
    private final BlockingQueue<FullMessage<PackageMessage>> messageBuffer = new LinkedBlockingQueue<FullMessage<PackageMessage>>(8);
    private Set<String> queueNames = Collections.emptySet();
    private Announcer announcer;
    private String subAgentName;
    private String pkgType;
    private volatile boolean running = true;
    private Thread queueThread;
    private LongSupplier catchAllDelay = catchAllDelays.get();
    private final Delay delay = new Delay();

    @Activate
    public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
        String subSlingId = Objects.requireNonNull(this.slingSettings.getSlingId());
        this.subAgentName = Strings.requireNotBlank(config.name());
        Objects.requireNonNull(config);
        Objects.requireNonNull(context);
        Objects.requireNonNull(this.metricsService);
        Objects.requireNonNull(this.packageBuilder);
        Objects.requireNonNull(this.slingSettings);
        Objects.requireNonNull(this.messagingProvider);
        Objects.requireNonNull(this.topics);
        Objects.requireNonNull(this.precondition);
        Objects.requireNonNull(this.bookKeeperFactory);
        this.subscriberMetrics = new SubscriberMetrics(this.metricsService, this.subAgentName, this.getFirst(config.agentNames()), config.editable());
        if (config.editable()) {
            this.commandPoller = new CommandPoller(this.messagingProvider, this.topics, subSlingId, this.subAgentName, this.delay::signal);
        }
        if (config.subscriberIdleCheck()) {
            AtomicBoolean readyHolder = this.subscriberReadyStore.getReadyHolder(this.subAgentName);
            this.idleCheck = new SubscriberReady(this.subAgentName, config.idleMillies(), config.forceReadyMillies(), readyHolder, System::currentTimeMillis);
            this.idleReadyCheck = new SubscriberIdleCheck(context, this.idleCheck);
        } else {
            this.idleCheck = new NoopIdle();
        }
        this.queueNames = this.getNotEmpty(config.agentNames());
        this.pkgType = Objects.requireNonNull(this.packageBuilder.getType());
        MessageSender statusSender = this.messagingProvider.createSender(this.topics.getStatusTopic());
        MessageSender logSender = this.messagingProvider.createSender(this.topics.getDiscoveryTopic());
        String packageNodeName = DistributionSubscriber.escapeTopicName(this.messagingProvider.getServerUri(), this.topics.getPackageTopic());
        BookKeeperConfig bkConfig = new BookKeeperConfig(this.subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling(), packageNodeName, config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
        this.bookKeeper = this.bookKeeperFactory.create(this.packageBuilder, bkConfig, (Consumer<PackageStatusMessage>)statusSender, (Consumer<LogMessage>)logSender, this.subscriberMetrics);
        long startOffset = this.bookKeeper.loadOffset() + 1L;
        String assign = startOffset > 0L ? this.messagingProvider.assignTo(startOffset) : null;
        this.packagePoller = this.messagingProvider.createPoller(this.topics.getPackageTopic(), Reset.latest, assign, new HandlerAdapter[]{HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)});
        this.queueThread = RunnableUtil.startBackgroundThread(this::processQueue, (String)String.format("Queue Processor for Subscriber agent %s", this.subAgentName));
        int announceDelay = (Integer)((Converting)Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue((Object)10000)).to(Integer.class);
        this.announcer = new Announcer(subSlingId, this.subAgentName, this.queueNames, (Consumer<DiscoveryMessage>)this.messagingProvider.createSender(this.topics.getDiscoveryTopic()), this.bookKeeper, config.maxRetries(), config.editable(), announceDelay);
        LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", new Object[]{this.subAgentName, startOffset, this.queueNames, config.subscriberIdleCheck()});
    }

    private String getFirst(String[] agentNames) {
        return agentNames != null && agentNames.length > 0 ? agentNames[0] : "";
    }

    public static String escapeTopicName(URI messagingUri, String topicName) {
        return String.format("%s%s_%s", messagingUri.getHost(), DistributionSubscriber.escape(messagingUri.getPath()), DistributionSubscriber.escape(topicName));
    }

    private static String escape(String st) {
        return Text.escapeIllegalJcrChars((String)st.replace("/", "_"));
    }

    private Set<String> getNotEmpty(String[] agentNames) {
        return Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(Collectors.toSet());
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly((Closeable[])new Closeable[]{this.announcer, this.packagePoller, this.idleReadyCheck, this.idleCheck, this.commandPoller});
        this.running = false;
        try {
            this.queueThread.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.info("Join interrupted");
        }
        LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}", new Object[]{this.subAgentName, this.queueNames, this.pkgType});
    }

    public DistributionAgentState getState() {
        boolean isBlocked;
        boolean bl = isBlocked = this.bookKeeper.getPackageRetries().getSum() > 0;
        if (isBlocked) {
            return DistributionAgentState.BLOCKED;
        }
        return this.messageBuffer.size() > 0 ? DistributionAgentState.RUNNING : DistributionAgentState.IDLE;
    }

    private void handlePackageMessage(MessageInfo info, PackageMessage message) {
        if (this.shouldEnqueue(info, message)) {
            this.subscriberMetrics.getPackageJournalDistributionDuration().update(System.currentTimeMillis() - info.getCreateTime(), TimeUnit.MILLISECONDS);
            this.enqueue((FullMessage<PackageMessage>)new FullMessage(info, (Object)message));
        } else {
            try {
                this.bookKeeper.skipPackage(info.getOffset());
            }
            catch (LoginException | PersistenceException e) {
                LOG.warn("Error marking distribution package {} at offset={} as skipped", new Object[]{message, info.getOffset(), e});
            }
        }
    }

    private void handleOffsetMessage(MessageInfo info, OffsetMessage message) {
        this.bookKeeper.handleInitialOffset(info.getOffset());
    }

    private boolean shouldEnqueue(MessageInfo info, PackageMessage message) {
        if (!this.queueNames.contains(message.getPubAgentName())) {
            LOG.info("Skipping distribution package {} at offset={} (not subscribed)", (Object)message, (Object)info.getOffset());
            return false;
        }
        if (!this.pkgType.equals(message.getPkgType())) {
            LOG.warn("Skipping distribution package {} at offset={} (bad pkgType)", (Object)message, (Object)info.getOffset());
            return false;
        }
        return true;
    }

    private void enqueue(FullMessage<PackageMessage> message) {
        try {
            while (this.running) {
                if (!this.messageBuffer.offer(message, 1000L, TimeUnit.MILLISECONDS)) continue;
                this.subscriberMetrics.getItemsBufferSize().increment();
                return;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        throw new RuntimeException();
    }

    private void processQueue() {
        LOG.info("Started Queue processor");
        while (this.running) {
            try {
                this.fetchAndProcessQueueItem();
            }
            catch (PreConditionTimeoutException e) {
                LOG.info(e.getMessage());
                this.delay.await(RETRY_DELAY_MILLIS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.debug(e.getMessage());
            }
            catch (Exception e) {
                LOG.error("Error processing queue item", (Throwable)e);
                this.delay.await(this.catchAllDelay.getAsLong());
            }
        }
        LOG.info("Stopped Queue processor");
    }

    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
        this.blockingSendStoredStatus();
        FullMessage<PackageMessage> item = this.blockingPeekQueueItem();
        try (Timer.Context context = this.subscriberMetrics.getProcessQueueItemDuration().time();){
            this.processQueueItem(item);
            this.messageBuffer.remove();
            this.subscriberMetrics.getItemsBufferSize().decrement();
            this.catchAllDelay = catchAllDelays.get();
        }
    }

    private void blockingSendStoredStatus() throws InterruptedException, IOException {
        try (Timer.Context context = this.subscriberMetrics.getSendStoredStatusDuration().time();){
            int retry = 0;
            while (this.running) {
                if (this.bookKeeper.sendStoredStatus(retry)) {
                    return;
                }
                ++retry;
            }
        }
        throw new InterruptedException("Shutting down");
    }

    private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
        while (this.running) {
            FullMessage message = (FullMessage)this.messageBuffer.peek();
            if (message != null) {
                return message;
            }
            this.delay.await(QUEUE_FETCH_DELAY_MILLIS);
        }
        throw new InterruptedException("Shutting down");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
        MessageInfo info = item.getInfo();
        PackageMessage pkgMsg = (PackageMessage)item.getMessage();
        boolean skip = this.shouldSkip(info.getOffset());
        PackageMessage.ReqType type = pkgMsg.getReqType();
        try {
            this.idleCheck.busy(this.bookKeeper.getRetries(pkgMsg.getPubAgentName()), info.getCreateTime());
            if (skip) {
                this.bookKeeper.removePackage(pkgMsg, info.getOffset());
            } else if (type == PackageMessage.ReqType.INVALIDATE) {
                this.bookKeeper.invalidateCache(pkgMsg, info.getOffset());
            } else {
                this.bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
            }
        }
        finally {
            this.idleCheck.idle();
        }
    }

    private boolean shouldSkip(long offset) {
        return this.isCleared(offset) || this.isSkipped(offset);
    }

    private boolean isCleared(long offset) {
        return this.commandPoller != null && this.commandPoller.isCleared(offset);
    }

    private boolean isSkipped(long offset) {
        return this.waitPrecondition(offset) == Precondition.Decision.SKIP;
    }

    private Precondition.Decision waitPrecondition(long offset) {
        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT_MILLIS;
        while (System.currentTimeMillis() < endTime && this.running) {
            Precondition.Decision decision = this.precondition.canProcess(this.subAgentName, offset);
            if (decision == Precondition.Decision.WAIT) {
                this.delay.await(100L);
                continue;
            }
            return decision;
        }
        String msg = String.format("Timeout after %s seconds while waiting to meet the preconditions to import the distribution package at offset %s on topic status", TimeUnit.MILLISECONDS.toSeconds(PRECONDITION_TIMEOUT_MILLIS), offset);
        throw new PreConditionTimeoutException(msg);
    }
}

