package com.litesalt.batch.handler;

import com.litesalt.batch.QueueStatusMonitor;
import com.litesalt.batch.entity.RowBatchQueue;
import java.util.List;
import java.util.Observable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/litesalt/batch/handler/RowBatchHandler.class */
public abstract class RowBatchHandler<T> extends Observable {
    protected RowBatchQueue<T> queue;
    protected long submitCapacity;
    protected Class<T> clazz;
    protected final Logger logger = Logger.getLogger(RowBatchHandler.class);
    protected AtomicLong loopSize = new AtomicLong(0);
    private ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public RowBatchHandler(long j, Class<T> cls) {
        addObserver(new QueueStatusMonitor(this));
        this.clazz = cls;
        this.submitCapacity = j;
    }

    public abstract void rowBatch(List<T> list);

    public void insertWithBatch(List<T> list) {
        try {
            if (this.queue != null && list != null && list.size() > 0) {
                this.queue.put((List) list);
                this.loopSize.addAndGet(list.size());
                if (this.loopSize.get() >= this.submitCapacity) {
                    this.threadPool.submit(new Thread() { // from class: com.litesalt.batch.handler.RowBatchHandler.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                RowBatchHandler.this.rowBatch(RowBatchHandler.this.take(RowBatchHandler.this.submitCapacity));
                            } catch (Exception e) {
                                RowBatchHandler.this.logger.error("批次插入发生异常", e);
                            }
                        }
                    });
                    this.loopSize.set(0L);
                    setChanged();
                    notifyObservers();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public List<T> take(long j) {
        try {
            if (this.queue != null) {
                return this.queue.take(j);
            }
            return null;
        } catch (Exception e) {
            this.logger.error("take is interrupted", e);
            return null;
        }
    }

    public List<T> takeAll() {
        try {
            if (this.queue != null) {
                return this.queue.takeAll();
            }
            return null;
        } catch (Exception e) {
            this.logger.error("take is interrupted", e);
            return null;
        }
    }

    public Class<T> getClazz() {
        return this.clazz;
    }
}
