package org.apache.qpid.client;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RangeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:qpid-client-M4.jar:org/apache/qpid/client/BasicMessageConsumer_0_10.class */
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> {
    protected final Logger _logger;
    private MessageFilter _filter;
    private AMQSession_0_10 _0_10session;
    private boolean _preAcquire;
    private boolean _isStarted;
    private final AtomicBoolean _syncReceive;
    private String _consumerTagString;

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicMessageConsumer_0_10(int i, AMQConnection aMQConnection, AMQDestination aMQDestination, String str, boolean z, MessageFactoryRegistry messageFactoryRegistry, AMQSession aMQSession, AMQProtocolHandler aMQProtocolHandler, FieldTable fieldTable, int i2, int i3, boolean z2, int i4, boolean z3, boolean z4) throws JMSException {
        super(i, aMQConnection, aMQDestination, str, z, messageFactoryRegistry, aMQSession, aMQProtocolHandler, fieldTable, i2, i3, z2, i4, z3, z4);
        this._logger = LoggerFactory.getLogger(getClass());
        this._filter = null;
        this._preAcquire = true;
        this._isStarted = false;
        this._syncReceive = new AtomicBoolean(false);
        this._0_10session = (AMQSession_0_10) aMQSession;
        if (str != null && !str.equals("")) {
            try {
                this._filter = new JMSSelectorFilter(str);
                if (aMQDestination instanceof AMQQueue) {
                    this._preAcquire = false;
                }
            } catch (QpidException e) {
                throw new InvalidSelectorException("cannot create consumer because of selector issue");
            }
        }
        this._isStarted = aMQConnection.started();
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void setConsumerTag(int i) {
        super.setConsumerTag(i);
        this._consumerTagString = String.valueOf(i);
    }

    public String getConsumerTagString() {
        return this._consumerTagString;
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void notifyMessage(AbstractJMSMessage abstractJMSMessage) {
        boolean z = false;
        try {
            z = checkPreConditions(abstractJMSMessage);
        } catch (AMQException e) {
            this._logger.error("Receivecd an Exception when receiving message", (Throwable) e);
            try {
                getSession().getAMQConnection().getExceptionListener().onException(new JMSAMQException("Error when receiving message", e));
            } catch (Exception e2) {
                this._logger.error("Exception when receiving message", (Throwable) e2);
            }
        }
        if (z) {
            if (isMessageListenerSet() && !getSession().prefetch()) {
                this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
            }
            this._logger.debug("messageOk, trying to notify");
            super.notifyMessage(abstractJMSMessage);
        }
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    void sendCancel() throws AMQException {
        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString(), new Option[0]);
        ((AMQSession_0_10) getSession()).getQpidSession().sync();
        getSession().confirmConsumerCancelled(getConsumerTag());
        ((AMQSession_0_10) getSession()).getCurrentException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void notifyMessage(UnprocessedMessage_0_10 unprocessedMessage_0_10) {
        super.notifyMessage((BasicMessageConsumer_0_10) unprocessedMessage_0_10);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void preApplicationProcessing(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        super.preApplicationProcessing(abstractJMSMessage);
        if (this._session.getTransacted() || this._session.getAcknowledgeMode() == 2) {
            return;
        }
        this._session.addUnacknowledgedMessage(abstractJMSMessage.getDeliveryTag());
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory aMQMessageDelegateFactory, UnprocessedMessage_0_10 unprocessedMessage_0_10) throws Exception {
        AMQMessageDelegate_0_10.updateExchangeTypeMapping(unprocessedMessage_0_10.getMessageTransfer().getHeader(), ((AMQSession_0_10) getSession()).getQpidSession());
        return this._messageFactory.createMessage(unprocessedMessage_0_10.getMessageTransfer());
    }

    private boolean checkPreConditions(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        boolean z = true;
        try {
            if (this._messageSelector != null && !this._messageSelector.equals("")) {
                z = this._filter.matches(abstractJMSMessage);
            }
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("messageOk " + z);
                this._logger.debug("_preAcquire " + this._preAcquire);
            }
            if (!z) {
                if (this._preAcquire) {
                    if (this._logger.isDebugEnabled()) {
                        this._logger.debug("filterMessage - trying to ack message");
                    }
                    acknowledgeMessage(abstractJMSMessage);
                } else {
                    if (this._logger.isDebugEnabled()) {
                        this._logger.debug("Message not OK, releasing");
                    }
                    releaseMessage(abstractJMSMessage);
                }
                if (!getSession().prefetch()) {
                    this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
                }
            }
            if (!this._preAcquire && z && !isNoConsume()) {
                if (this._logger.isDebugEnabled()) {
                    this._logger.debug("filterMessage - trying to acquire message");
                }
                z = acquireMessage(abstractJMSMessage);
                this._logger.debug("filterMessage - message acquire status : " + z);
            }
            return z;
        } catch (Exception e) {
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
        }
    }

    private void acknowledgeMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        if (this._preAcquire) {
            return;
        }
        RangeSet rangeSet = new RangeSet();
        rangeSet.add((int) abstractJMSMessage.getDeliveryTag());
        this._0_10session.messageAcknowledge(rangeSet, this._acknowledgeMode != 257);
        this._0_10session.getCurrentException();
    }

    private void releaseMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        if (this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add((int) abstractJMSMessage.getDeliveryTag());
            this._0_10session.getQpidSession().messageRelease(rangeSet, new Option[0]);
            this._0_10session.getCurrentException();
        }
    }

    private boolean acquireMessage(AbstractJMSMessage abstractJMSMessage) throws AMQException {
        boolean z = false;
        if (!this._preAcquire) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add((int) abstractJMSMessage.getDeliveryTag());
            RangeSet transfers = this._0_10session.getQpidSession().messageAcquire(rangeSet, new Option[0]).get().getTransfers();
            if (transfers != null && transfers.size() > 0) {
                z = true;
            }
        }
        return z;
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        if (messageListener != null && !getSession().prefetch()) {
            this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
        if (messageListener == null || this._synchronousQueue.isEmpty()) {
            return;
        }
        Iterator it = this._synchronousQueue.iterator();
        while (it.hasNext()) {
            AbstractJMSMessage abstractJMSMessage = (AbstractJMSMessage) it.next();
            it.remove();
            this._session.rejectMessage(abstractJMSMessage, true);
        }
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void failedOverPost() {
        if (this._0_10session.isStarted() && this._syncReceive.get()) {
            this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public Object getMessageFromQueue(long j) throws InterruptedException {
        if (!getSession().prefetch()) {
            this._syncReceive.set(true);
        }
        if (this._0_10session.isStarted() && !getSession().prefetch() && this._synchronousQueue.isEmpty()) {
            this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1L, Option.UNRELIABLE);
        }
        Object messageFromQueue = super.getMessageFromQueue(j);
        if (messageFromQueue == null && this._0_10session.isStarted()) {
            this._0_10session.getQpidSession().messageFlush(getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
            this._0_10session.getQpidSession().sync();
            this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.BYTE, -1L, Option.UNRELIABLE);
            if (getSession().prefetch()) {
                this._0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, this._0_10session.getAMQConnection().getMaxPrefetch(), Option.UNRELIABLE);
            }
            this._0_10session.syncDispatchQueue();
            messageFromQueue = super.getMessageFromQueue(-1L);
        }
        if (!getSession().prefetch()) {
            this._syncReceive.set(false);
        }
        return messageFromQueue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void postDeliver(AbstractJMSMessage abstractJMSMessage) throws JMSException {
        super.postDeliver(abstractJMSMessage);
        if (this._acknowledgeMode == 257 && !this._session.isInRecovery()) {
            this._session.acknowledgeMessage(abstractJMSMessage.getDeliveryTag(), false);
        }
        if (this._acknowledgeMode == 1 && !this._session.isInRecovery() && this._session.getAMQConnection().getSyncAck()) {
            ((AMQSession_0_10) getSession()).flushAcknowledgments();
            ((AMQSession_0_10) getSession()).getQpidSession().sync();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.qpid.client.BasicMessageConsumer
    public Message receiveBrowse() throws JMSException {
        return receiveNoWait();
    }

    @Override // org.apache.qpid.client.BasicMessageConsumer
    public void rollbackPendingMessages() {
        if (this._synchronousQueue.size() > 0) {
            RangeSet rangeSet = new RangeSet();
            Iterator it = this._synchronousQueue.iterator();
            while (it.hasNext()) {
                Object next = it.next();
                if (next instanceof AbstractJMSMessage) {
                    rangeSet.add((int) ((AbstractJMSMessage) next).getDeliveryTag());
                    it.remove();
                } else {
                    this._logger.error("Queue contained a :" + next.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                    it.remove();
                }
            }
            this._0_10session.getQpidSession().messageRelease(rangeSet, Option.SET_REDELIVERED);
            clearReceiveQueue();
        }
    }
}
