package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractTask.class */
public abstract class AbstractTask implements Task {
    private static final long NO_DEADLINE = -1;
    protected Set<TopicPartition> inputPartitions;
    protected final Logger log;
    protected final LogContext logContext;
    protected final String logPrefix;
    protected final TaskId id;
    protected final ProcessorTopology topology;
    protected final StateDirectory stateDirectory;
    protected final ProcessorStateManager stateMgr;
    private final long taskTimeoutMs;
    private Task.State state = Task.State.CREATED;
    private long deadlineMs = -1;
    protected Map<TopicPartition, Long> offsetSnapshotSinceLastFlush = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractTask(TaskId taskId, ProcessorTopology processorTopology, StateDirectory stateDirectory, ProcessorStateManager processorStateManager, Set<TopicPartition> set, long j, String str, Class<? extends AbstractTask> cls) {
        this.id = taskId;
        this.stateMgr = processorStateManager;
        this.topology = processorTopology;
        this.inputPartitions = set;
        this.stateDirectory = stateDirectory;
        this.taskTimeoutMs = j;
        this.logPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()) + String.format("%s [%s] ", str, taskId);
        this.logContext = new LogContext(this.logPrefix);
        this.log = this.logContext.logger(cls);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeWriteCheckpoint(boolean z) {
        Map<TopicPartition, Long> changelogOffsets = this.stateMgr.changelogOffsets();
        if (StateManagerUtil.checkpointNeeded(z, this.offsetSnapshotSinceLastFlush, changelogOffsets)) {
            this.stateMgr.flush();
            this.stateMgr.checkpoint();
            this.offsetSnapshotSinceLastFlush = new HashMap(changelogOffsets);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public TaskId id() {
        return this.id;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Set<TopicPartition> inputPartitions() {
        return this.inputPartitions;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Collection<TopicPartition> changelogPartitions() {
        return this.stateMgr.changelogPartitions();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
        this.stateMgr.markChangelogAsCorrupted(collection);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public StateStore getStore(String str) {
        return this.stateMgr.getStore(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public final Task.State state() {
        return this.state;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void revive() {
        if (this.state != Task.State.CLOSED) {
            throw new IllegalStateException("Illegal state " + state() + " while reviving task " + this.id);
        }
        clearTaskTimeout();
        transitionTo(Task.State.CREATED);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void transitionTo(Task.State state) {
        Task.State state2 = state();
        if (!state2.isValidTransition(state)) {
            throw new IllegalStateException("Invalid transition from " + state2 + " to " + state);
        }
        this.state = state;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void updateInputPartitions(Set<TopicPartition> set, Map<String, List<String>> map) {
        this.inputPartitions = set;
        this.topology.updateSourceTopics(map);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void maybeInitTaskTimeoutOrThrow(long j, Exception exc) {
        if (this.deadlineMs == -1) {
            this.deadlineMs = j + this.taskTimeoutMs;
        } else if (j > this.deadlineMs) {
            String format = String.format("Task %s did not make progress within %d ms. Adjust `%s` if needed.", this.id, Long.valueOf((j - this.deadlineMs) + this.taskTimeoutMs), StreamsConfig.TASK_TIMEOUT_MS_CONFIG);
            if (exc == null) {
                throw new StreamsException((Throwable) new TimeoutException(format), this.id);
            }
            throw new StreamsException((Throwable) new TimeoutException(format, exc), this.id);
        }
        if (exc != null) {
            this.log.debug(String.format("Task did not make progress. Remaining time to deadline %d; retrying.", Long.valueOf(this.deadlineMs - j)), exc);
        } else {
            this.log.debug("Task did not make progress. Remaining time to deadline {}; retrying.", Long.valueOf(this.deadlineMs - j));
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void clearTaskTimeout() {
        if (this.deadlineMs != -1) {
            this.log.debug("Clearing task timeout.");
            this.deadlineMs = -1L;
        }
    }
}
