package org.jboss.soa.esb.actions.aggregator;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor;
import org.jboss.soa.esb.actions.ActionLifecycleException;
import org.jboss.soa.esb.actions.ActionProcessingException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;

/* loaded from: input_file:org/jboss/soa/esb/actions/aggregator/AbstractAggregator.class */
public abstract class AbstractAggregator extends AbstractActionPipelineProcessor {
    private static final Logger LOGGER = Logger.getLogger(AbstractAggregator.class);
    private static final long DEFAULT_TIMEOUT = 600000;
    private final long timeout;
    private TimeoutProcessor timeoutProcessor;
    private AggregatorEntry head;
    private Map<String, AggregatorEntry> seriesUUIDtoEntry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jboss/soa/esb/actions/aggregator/AbstractAggregator$AggregatorEntry.class */
    public final class AggregatorEntry {
        private final long expiry;
        private final String seriesUUID;
        private int sequenceCount;
        private int aggregateCount;
        private Message message;
        private Lock lock = new ReentrantLock();
        private AggregatorEntry previousEntry;
        private AggregatorEntry nextEntry;

        AggregatorEntry(String str) {
            this.expiry = System.currentTimeMillis() + AbstractAggregator.this.timeout;
            this.seriesUUID = str;
        }

        long getExpiry() {
            return this.expiry;
        }

        String getSeriesUUID() {
            return this.seriesUUID;
        }

        void setSequenceCount(int i) {
            this.sequenceCount = i;
        }

        int getSequenceCount() {
            return this.sequenceCount;
        }

        void setMessage(Message message) {
            this.message = message;
        }

        Message getMessage() {
            return this.message;
        }

        int incAggregateCount() {
            int i = this.aggregateCount + 1;
            this.aggregateCount = i;
            return i;
        }

        void lock() {
            this.lock.lock();
        }

        void unlock() {
            this.lock.unlock();
        }

        void setPreviousEntry(AggregatorEntry aggregatorEntry) {
            this.previousEntry = aggregatorEntry;
        }

        AggregatorEntry getPreviousEntry() {
            return this.previousEntry;
        }

        void setNextEntry(AggregatorEntry aggregatorEntry) {
            this.nextEntry = aggregatorEntry;
        }

        AggregatorEntry getNextEntry() {
            return this.nextEntry;
        }

        boolean hasExpired() {
            return this.nextEntry == null;
        }
    }

    /* loaded from: input_file:org/jboss/soa/esb/actions/aggregator/AbstractAggregator$TimeoutProcessor.class */
    private final class TimeoutProcessor implements Runnable {
        private Thread thread;
        private volatile boolean active;

        private TimeoutProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AbstractAggregator.LOGGER.debug("Starting timeout processor");
            synchronized (AbstractAggregator.this.head) {
                while (this.active) {
                    long currentTimeMillis = System.currentTimeMillis();
                    AggregatorEntry nextEntry = AbstractAggregator.this.head.getNextEntry();
                    try {
                        if (nextEntry == AbstractAggregator.this.head) {
                            if (AbstractAggregator.LOGGER.isDebugEnabled()) {
                                AbstractAggregator.LOGGER.debug("Empty aggregator list, waiting for " + AbstractAggregator.this.timeout);
                            }
                            AbstractAggregator.this.head.wait(AbstractAggregator.this.timeout);
                        } else if (nextEntry.getExpiry() > currentTimeMillis) {
                            long expiry = nextEntry.getExpiry() - currentTimeMillis;
                            if (AbstractAggregator.LOGGER.isDebugEnabled()) {
                                AbstractAggregator.LOGGER.debug("First entry not expired, waiting for " + expiry);
                            }
                            AbstractAggregator.this.head.wait(expiry);
                        } else {
                            if (AbstractAggregator.LOGGER.isDebugEnabled()) {
                                AbstractAggregator.LOGGER.debug("Timing out entry for " + nextEntry.getSeriesUUID());
                            }
                            AbstractAggregator.this.removeEntry(nextEntry);
                        }
                    } catch (InterruptedException e) {
                    }
                }
            }
            AbstractAggregator.LOGGER.debug("Stopping timeout processor");
        }

        public void start() {
            if (this.thread == null) {
                this.thread = new Thread(this);
                this.active = true;
                this.thread.start();
            }
        }

        public void stop() {
            if (this.thread != null) {
                this.active = false;
                synchronized (AbstractAggregator.this.head) {
                    AbstractAggregator.this.head.notify();
                }
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                    AbstractAggregator.LOGGER.warn("Unexpected interruption while waiting for timeout processor to stop");
                }
                this.thread = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractAggregator(ConfigTree configTree) throws ConfigurationException {
        String attribute = configTree.getAttribute("timeoutInMillis");
        if (attribute == null) {
            this.timeout = DEFAULT_TIMEOUT;
        } else {
            try {
                this.timeout = Long.valueOf(attribute).longValue();
            } catch (NumberFormatException e) {
                throw new ConfigurationException("Failed to parse timeout " + attribute);
            }
        }
    }

    @Override // org.jboss.soa.esb.actions.AbstractActionLifecycle, org.jboss.soa.esb.actions.ActionLifecycle
    public void initialise() throws ActionLifecycleException {
        this.head = new AggregatorEntry(null);
        this.head.setNextEntry(this.head);
        this.head.setPreviousEntry(this.head);
        this.seriesUUIDtoEntry = new HashMap();
        this.timeoutProcessor = new TimeoutProcessor();
        this.timeoutProcessor.start();
    }

    @Override // org.jboss.soa.esb.actions.AbstractActionLifecycle, org.jboss.soa.esb.actions.ActionLifecycle
    public void destroy() throws ActionLifecycleException {
        this.timeoutProcessor.stop();
        this.timeoutProcessor = null;
    }

    @Override // org.jboss.soa.esb.actions.ActionPipelineProcessor
    public Message process(Message message) throws ActionProcessingException {
        Object property = message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS);
        if (property == null || !(property instanceof AggregateDetails)) {
            return message;
        }
        AggregateDetails aggregateDetails = (AggregateDetails) property;
        String seriesUUID = aggregateDetails.getSeriesUUID();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Aggregating message into " + seriesUUID);
        }
        AggregatorEntry retrieveEntry = retrieveEntry(seriesUUID);
        try {
            Message message2 = retrieveEntry.getMessage();
            if (message2 == null) {
                message2 = MessageFactory.getInstance().getMessage();
                message2.getProperties().setProperty(AggregateDetails.SERIES_UUID, seriesUUID);
                retrieveEntry.setMessage(message2);
            }
            if (aggregateMessage(message2, aggregateDetails.getMessageSequence(), message)) {
                int incAggregateCount = retrieveEntry.incAggregateCount();
                Integer sequenceCount = aggregateDetails.getSequenceCount();
                if (sequenceCount != null) {
                    retrieveEntry.setSequenceCount(sequenceCount.intValue());
                    message2.getProperties().setProperty(AggregateDetails.SEQUENCE_COUNT, sequenceCount);
                }
                int sequenceCount2 = retrieveEntry.getSequenceCount();
                if (sequenceCount2 == 0 || incAggregateCount < sequenceCount2) {
                    message2 = null;
                } else {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Aggregated message for " + seriesUUID + ", aggregateCount " + incAggregateCount);
                    }
                    retrieveEntry.setMessage(null);
                }
            }
            return message2;
        } finally {
            releaseEntry(retrieveEntry);
        }
    }

    private AggregatorEntry retrieveEntry(String str) {
        AggregatorEntry aggregatorEntry;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Retrieving entry for " + str);
        }
        do {
            synchronized (this.head) {
                aggregatorEntry = this.seriesUUIDtoEntry.get(str);
                if (aggregatorEntry == null) {
                    aggregatorEntry = new AggregatorEntry(str);
                    this.seriesUUIDtoEntry.put(str, aggregatorEntry);
                    AggregatorEntry previousEntry = this.head.getPreviousEntry();
                    aggregatorEntry.setNextEntry(this.head);
                    this.head.setPreviousEntry(aggregatorEntry);
                    aggregatorEntry.setPreviousEntry(previousEntry);
                    previousEntry.setNextEntry(aggregatorEntry);
                }
            }
        } while (aggregatorEntry.hasExpired());
        aggregatorEntry.lock();
        return aggregatorEntry;
    }

    private void releaseEntry(AggregatorEntry aggregatorEntry) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Releasing entry for " + aggregatorEntry.getSeriesUUID());
        }
        if (aggregatorEntry.getMessage() == null) {
            removeEntry(aggregatorEntry);
        }
        aggregatorEntry.unlock();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeEntry(AggregatorEntry aggregatorEntry) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Removing entry for " + aggregatorEntry.getSeriesUUID());
        }
        synchronized (this.head) {
            if (!aggregatorEntry.hasExpired()) {
                this.seriesUUIDtoEntry.remove(aggregatorEntry.getSeriesUUID());
                AggregatorEntry nextEntry = aggregatorEntry.getNextEntry();
                AggregatorEntry previousEntry = aggregatorEntry.getPreviousEntry();
                aggregatorEntry.setNextEntry(null);
                aggregatorEntry.setPreviousEntry(null);
                nextEntry.setPreviousEntry(previousEntry);
                previousEntry.setNextEntry(nextEntry);
            }
        }
    }

    protected abstract boolean aggregateMessage(Message message, Integer num, Message message2) throws ActionProcessingException;
}
