/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.purgatory;

import com.yammer.metrics.core.Meter;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.DeleteRecordsPartitionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelayedDeleteRecords
extends DelayedOperation {
    private static final Logger LOG = LoggerFactory.getLogger(DelayedDeleteRecords.class);
    private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup("kafka.server", "DelayedDeleteRecordsMetrics");
    private static final Meter AGGREGATE_EXPIRATION_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
    private final Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus;
    private final BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending;
    private final Consumer<Map<TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult>> responseCallback;

    public DelayedDeleteRecords(long delayMs, Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus, BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending, Consumer<Map<TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult>> responseCallback) {
        super(delayMs);
        this.onAcksPending = onAcksPending;
        this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus);
        this.responseCallback = responseCallback;
        deleteRecordsStatus.forEach((topicPartition, status) -> {
            if (status.responseStatus().errorCode() == Errors.NONE.code()) {
                status.setAcksPending(true);
                status.responseStatus().setErrorCode(Errors.REQUEST_TIMED_OUT.code());
            } else {
                status.setAcksPending(false);
            }
            LOG.trace("Initial partition status for {} is {}", topicPartition, status);
        });
    }

    public boolean tryComplete() {
        this.deleteRecordsStatus.forEach((topicPartition, status) -> {
            LOG.trace("Checking delete records satisfaction for {}, current status {}", topicPartition, status);
            if (status.acksPending()) {
                this.onAcksPending.accept((TopicPartition)topicPartition, (DeleteRecordsPartitionStatus)status);
            }
        });
        return this.deleteRecordsStatus.values().stream().noneMatch(DeleteRecordsPartitionStatus::acksPending) && this.forceComplete();
    }

    public void onExpiration() {
        AGGREGATE_EXPIRATION_METER.mark(this.deleteRecordsStatus.values().stream().filter(DeleteRecordsPartitionStatus::acksPending).count());
    }

    public void onComplete() {
        this.responseCallback.accept(this.deleteRecordsStatus.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((DeleteRecordsPartitionStatus)e.getValue()).responseStatus())));
    }
}

