package edu.sc.seis.crocus.cassandra;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.TokenRangeOfflineException;
import com.netflix.astyanax.retry.ExponentialBackoff;
import edu.iris.Fissures.IfNetwork.ChannelId;
import edu.iris.Fissures.model.UnitImpl;
import edu.sc.seis.cormorant.seismogram.BudLightWriter;
import edu.sc.seis.cormorant.seismogram.DataRecordQueue;
import edu.sc.seis.crocus.CrocusException;
import edu.sc.seis.crocus.cassandra.timedData.AbstractCoverage;
import edu.sc.seis.crocus.cassandra.timedData.Coverage;
import edu.sc.seis.crocus.cassandra.timedData.MiniSeedTimedData;
import edu.sc.seis.crocus.cassandra.timedData.NoCoverage;
import edu.sc.seis.crocus.cassandra.timedData.TimedData;
import edu.sc.seis.crocus.importProcess.ImportProcessor;
import edu.sc.seis.fissuresUtil.time.MicroSecondTimeRange;
import edu.sc.seis.seisFile.mseed.DataRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:edu/sc/seis/crocus/cassandra/CrocusWriter.class */
public class CrocusWriter implements BudLightWriter {
    Keyspace keyspace;
    Integer timeToLiveSeconds;
    AstyanaxContext<Keyspace> context;
    DataRecordQueue inQueue;
    int numRecords;
    List<ImportProcessor> importProcessors;
    public static final byte[] EMPTY_BYTES = new byte[0];
    private static final Logger logger = LoggerFactory.getLogger(CrocusWriter.class);

    public CrocusWriter(AstyanaxContext<Keyspace> astyanaxContext, DataRecordQueue dataRecordQueue) {
        this(astyanaxContext, dataRecordQueue, null, new ArrayList());
    }

    public CrocusWriter(AstyanaxContext<Keyspace> astyanaxContext, DataRecordQueue dataRecordQueue, Integer num, List<ImportProcessor> list) {
        this.timeToLiveSeconds = null;
        this.numRecords = 0;
        this.importProcessors = new ArrayList();
        this.inQueue = dataRecordQueue;
        this.context = astyanaxContext;
        this.timeToLiveSeconds = num;
        this.keyspace = (Keyspace) astyanaxContext.getClient();
        this.importProcessors.addAll(list);
    }

    public void run() {
        logger.info("crocus run " + this.inQueue.getSize());
        while (true) {
            try {
                if (this.inQueue.getSize() != 0) {
                    List<DataRecord> work = this.inQueue.getWork();
                    try {
                        outputToCassandra(new NSLCDay(work.get(0)), work);
                    } catch (ConnectionException e) {
                        logger.error("ConnectionException writing data records, send back to queue (" + this.inQueue.getSize() + "), first is " + work.get(0));
                        if (!this.inQueue.isQueueFull()) {
                            Iterator<DataRecord> it = work.iterator();
                            while (it.hasNext()) {
                                this.inQueue.addWorkNoWait(it.next());
                            }
                        }
                    } catch (TokenRangeOfflineException e2) {
                        logger.error("TokenRangeOfflineException writing data records, send back to queue (" + this.inQueue.getSize() + "), first is " + work.get(0));
                        if (!this.inQueue.isQueueFull()) {
                            Iterator<DataRecord> it2 = work.iterator();
                            while (it2.hasNext()) {
                                this.inQueue.addWorkNoWait(it2.next());
                            }
                        }
                    }
                } else {
                    synchronized (this.inQueue) {
                        this.inQueue.wait();
                    }
                }
            } catch (Exception e3) {
                logger.error("exception", e3);
                return;
            }
        }
    }

    public DataRecordQueue getQueue() {
        return this.inQueue;
    }

    public Integer getTimeToLiveSeconds() {
        return this.timeToLiveSeconds;
    }

    public void sendNoDataToCache(ChannelId channelId, MicroSecondTimeRange microSecondTimeRange) {
        try {
            HashMap<NSLCDay, MicroSecondTimeRange> splitOverKeys = NSLCDay.splitOverKeys(channelId, microSecondTimeRange);
            MutationBatch prepareMutationBatch = this.keyspace.prepareMutationBatch();
            for (NSLCDay nSLCDay : splitOverKeys.keySet()) {
                ColumnListMutation withRow = prepareMutationBatch.withRow(MSeedColumnFamilyUtil.getMSeedColFamilyDef(), nSLCDay);
                NoCoverage noCoverage = new NoCoverage(splitOverKeys.get(nSLCDay).getBeginTime(), splitOverKeys.get(nSLCDay).getEndTime());
                withRow.putColumn(noCoverage.getPrefixedDate(), noCoverage.getEndTime(), this.timeToLiveSeconds);
            }
            prepareMutationBatch.execute();
        } catch (ConnectionException e) {
            throw new RuntimeException("unable to put nodata in cache", e);
        }
    }

    protected void outputToCassandra(NSLCDay nSLCDay, List<DataRecord> list) throws IOException, ConnectionException, CrocusException {
        if (this.numRecords > 1000) {
            logger.debug("Save " + nSLCDay + " " + list.size() + " out of queue=" + this.inQueue.getSize());
            this.numRecords = 0;
        }
        Coverage coverage = null;
        ArrayList arrayList = new ArrayList(list.size());
        for (DataRecord dataRecord : list) {
            MiniSeedTimedData miniSeedTimedData = new MiniSeedTimedData(dataRecord);
            arrayList.add(miniSeedTimedData);
            Date date = miniSeedTimedData.getDate();
            Date time = dataRecord.getHeader().getPredictedNextStartBtime().convertToCalendar().getTime();
            if (coverage == null) {
                coverage = new Coverage(date, time);
            } else if (((float) Math.abs(date.getTime() - coverage.getEndTime().getTime())) >= 1.0f / dataRecord.getHeader().getSampleRate()) {
                arrayList.add(coverage);
                coverage = new Coverage(date, time);
            } else if (AbstractCoverage.MAX_COVERAGE.getValue(UnitImpl.MILLISECOND) < time.getTime() - coverage.getDate().getTime()) {
                arrayList.add(coverage);
                coverage = new Coverage(date, time);
            } else {
                coverage = new Coverage(coverage.getDate(), time);
            }
        }
        if (coverage != null) {
            arrayList.add(coverage);
        }
        timedDataToCassandra(nSLCDay, arrayList);
        Iterator<ImportProcessor> it = this.importProcessors.iterator();
        while (it.hasNext()) {
            it.next().processDataRecords(nSLCDay, list);
        }
        this.numRecords++;
    }

    protected OperationResult<Void> timedDataToCassandra(NSLCDay nSLCDay, List<TimedData> list) throws CrocusException, IOException, ConnectionException {
        MutationBatch withRetryPolicy = this.keyspace.prepareMutationBatch().withRetryPolicy(new ExponentialBackoff(100, 3));
        ColumnListMutation withRow = withRetryPolicy.withRow(MSeedColumnFamilyUtil.getMSeedColFamilyDef(), nSLCDay);
        for (TimedData timedData : list) {
            withRow = withRow.putColumn(timedData.getPrefixedDate(), timedData.toBytes(), this.timeToLiveSeconds);
        }
        return withRetryPolicy.execute();
    }

    public AstyanaxContext<Keyspace> getContext() {
        return this.context;
    }
}
