/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.scout.rt.server.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.util.List;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.scout.commons.StringUtility;
import org.eclipse.scout.commons.exception.ProcessingException;
import org.eclipse.scout.commons.job.JobEx;
import org.eclipse.scout.commons.logger.IScoutLogger;
import org.eclipse.scout.commons.logger.ScoutLogManager;
import org.eclipse.scout.commons.serialization.IObjectSerializer;
import org.eclipse.scout.commons.serialization.SerializationUtility;
import org.eclipse.scout.rt.server.services.common.clustersync.IClusterNotificationMessage;
import org.eclipse.scout.rt.server.services.common.clustersync.IPublishSubscribeMessageListener;
import org.eclipse.scout.rt.server.services.common.clustersync.IPublishSubscribeMessageService;
import org.eclipse.scout.service.AbstractService;
import org.osgi.framework.ServiceRegistration;

public class RabbitMQMessageService
extends AbstractService
implements IPublishSubscribeMessageService {
    private static final IScoutLogger LOG = ScoutLogManager.getLogger(RabbitMQMessageService.class);
    public static final String NOTIFICATION_QUEUE_NAME = "scoutNotificationQueue";
    private IObjectSerializer m_objectSerializer;
    private String m_host;
    private String m_user;
    private String m_password;
    private String m_uri;
    private boolean m_authEnabled;
    private IPublishSubscribeMessageListener m_listener;
    private Job m_job;

    public void subscribe() throws ProcessingException {
        this.m_job = this.createMessageListenerJob();
        this.m_job.schedule();
    }

    public void unsubsribe() throws ProcessingException {
        if (this.m_job != null) {
            this.m_job.cancel();
        }
    }

    public void publishNotifications(List<IClusterNotificationMessage> notificationMessages) {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri(this.m_uri);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(NOTIFICATION_QUEUE_NAME, "fanout");
            for (IClusterNotificationMessage notificationMessage : notificationMessages) {
                channel.basicPublish(NOTIFICATION_QUEUE_NAME, "", null, this.m_objectSerializer.serialize((Object)notificationMessage));
            }
            channel.close();
            connection.close();
        }
        catch (Exception e) {
            LOG.error("Cannot publish cluster node notifications " + notificationMessages, (Throwable)e);
        }
    }

    public void initializeService(ServiceRegistration registration) {
        super.initializeService(registration);
        this.m_objectSerializer = SerializationUtility.createObjectSerializer();
        this.setUri();
    }

    protected void setUri() {
        if (!StringUtility.hasText((CharSequence)this.m_host) && !StringUtility.hasText((CharSequence)this.m_uri)) {
            throw new IllegalArgumentException("Host or UIR for RabbitMQ has to be set in config.ini");
        }
        if (!StringUtility.hasText((CharSequence)this.m_uri)) {
            if (this.m_authEnabled) {
                if (!StringUtility.hasText((CharSequence)this.m_user)) {
                    throw new IllegalArgumentException("Host for RabbitMQ has to be set in config.ini");
                }
                if (!StringUtility.hasText((CharSequence)this.m_password)) {
                    throw new IllegalArgumentException("Host for RabbitMQ has to be set in config.ini");
                }
                this.m_uri = "amqp://" + this.m_user + ":" + this.m_password + "@" + this.m_host;
            } else {
                this.m_uri = "amqp://" + this.m_host;
            }
        }
    }

    protected Job createMessageListenerJob() {
        return new JobEx("RabbitMQMessageListener Job"){

            protected IStatus run(IProgressMonitor monitor) {
                try {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setUri(RabbitMQMessageService.this.m_uri);
                    Connection connection = factory.newConnection();
                    Channel incomingChannel = connection.createChannel();
                    incomingChannel.exchangeDeclare(RabbitMQMessageService.NOTIFICATION_QUEUE_NAME, "fanout");
                    String queueName = incomingChannel.queueDeclare().getQueue();
                    incomingChannel.queueBind(queueName, RabbitMQMessageService.NOTIFICATION_QUEUE_NAME, "");
                    QueueingConsumer consumer = new QueueingConsumer(incomingChannel);
                    incomingChannel.basicConsume(queueName, true, (Consumer)consumer);
                    while (true) {
                        this.tryHandleMessage(consumer);
                    }
                }
                catch (Exception e) {
                    LOG.error("Unable to set up message listener", (Throwable)e);
                    return Status.OK_STATUS;
                }
            }

            private void tryHandleMessage(QueueingConsumer consumer) {
                try {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    byte[] message = delivery.getBody();
                    IClusterNotificationMessage notificationMessage = (IClusterNotificationMessage)RabbitMQMessageService.this.m_objectSerializer.deserialize(message, IClusterNotificationMessage.class);
                    if (RabbitMQMessageService.this.m_listener != null) {
                        RabbitMQMessageService.this.m_listener.onMessage(notificationMessage);
                    }
                }
                catch (Exception e) {
                    LOG.error("Unable to read incoming message", (Throwable)e);
                }
            }
        };
    }

    public void setHost(String host) {
        this.m_host = host;
    }

    public void setUser(String user) {
        this.m_user = user;
    }

    public void setPassword(String password) {
        this.m_password = password;
    }

    public void setUri(String uri) {
        this.m_uri = uri;
    }

    public void setAuth(boolean enabled) {
        this.m_authEnabled = enabled;
    }

    public void setListener(IPublishSubscribeMessageListener listener) {
        this.m_listener = listener;
    }

    public IPublishSubscribeMessageListener getListener() {
        return this.m_listener;
    }
}

