/*
 * Decompiled with CFR 0.152.
 */
package com.hand.hap.message;

import com.hand.hap.core.AppContextInitListener;
import com.hand.hap.message.IQueueMessageListener;
import com.hand.hap.message.MethodReflectUtils;
import com.hand.hap.message.QueueMonitor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import redis.clients.jedis.Jedis;

public class QueueListenerContainer
implements AppContextInitListener,
DisposableBean,
SmartLifecycle {
    private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);
    private RedisConnectionFactory connectionFactory;
    private static final int PHASE = 9999;
    private static final long MIN_RECOVERY_INTERVAL = 2000L;
    private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    private static final long IDLE_SLEEP_TIME = 100L;
    private long recoveryInterval = 5000L;
    private volatile boolean running = false;
    private ExecutorService executorService;
    private List<IQueueMessageListener<?>> listeners;
    private List<MonitorTask> monitorTaskList = new ArrayList<MonitorTask>();
    private RedisSerializer<String> stringRedisSerializer;

    public RedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public long getRecoveryInterval() {
        return this.recoveryInterval;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
        if (recoveryInterval < 2000L) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("minimum for recoveryInterval is {}", (Object)2000L);
            }
            this.recoveryInterval = 2000L;
        }
    }

    public List<IQueueMessageListener<?>> getListeners() {
        return this.listeners;
    }

    public void setListeners(List<IQueueMessageListener<?>> listeners) {
        this.listeners = listeners;
    }

    public RedisSerializer<String> getStringRedisSerializer() {
        return this.stringRedisSerializer;
    }

    @Autowired
    public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    public void destroy() throws Exception {
        this.stop();
    }

    @Override
    public void contextInitialized(ApplicationContext applicationContext) {
        if (this.listeners == null) {
            this.listeners = new ArrayList();
        }
        Map lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
        lts.forEach((k, v) -> {
            Class<?> clazz = v.getClass();
            QueueMonitor qm = clazz.getAnnotation(QueueMonitor.class);
            String queue = qm.queue();
            String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
            List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
            if (methods.isEmpty()) {
                if (this.logger.isErrorEnabled()) {
                    this.logger.error("can not find proper method of name '{}' for bean {}", (Object)mn, v);
                }
                return;
            }
            Method method = methods.get(0);
            SimpleQueueListener qml = new SimpleQueueListener(queue, v, method);
            this.listeners.add(qml);
        });
        this.executorService = Executors.newFixedThreadPool(this.listeners.size());
        for (IQueueMessageListener<?> receiver : this.listeners) {
            MonitorTask task = new MonitorTask(receiver);
            this.monitorTaskList.add(task);
            this.executorService.execute((Runnable)((Object)task));
        }
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        if (!this.running) {
            this.running = true;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("startup success");
            }
        }
    }

    public void stop() {
        if (this.isRunning()) {
            this.running = false;
            this.monitorTaskList.forEach(MonitorTask::stop);
            this.executorService.shutdownNow();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("shutdown complete");
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 9999;
    }

    private class MonitorTask<T>
    implements SchedulingAwareRunnable {
        private IQueueMessageListener<T> receiver;
        private RedisConnection connection;
        private boolean running = false;

        MonitorTask(IQueueMessageListener<T> receiver) {
            this.receiver = receiver;
            Assert.notNull(receiver, (String)"receiver is null.");
            Assert.hasText((String)receiver.getQueue(), (String)"queue is not valid");
        }

        public void stop() {
            this.running = false;
            this.safeClose(true);
        }

        public void run() {
            this.running = true;
            while (this.running) {
                T message;
                block9: {
                    try {
                        if (this.connection == null) {
                            this.connection = QueueListenerContainer.this.connectionFactory.getConnection();
                        }
                        if ((message = this.fetchMessage(this.connection, this.receiver.getQueue())) == null) {
                            this.sleep_(100L);
                        }
                        break block9;
                    }
                    catch (Throwable thr) {
                        if (!this.running) break;
                        this.safeClose(new boolean[0]);
                        if (QueueListenerContainer.this.logger.isDebugEnabled()) {
                            QueueListenerContainer.this.logger.error("exception occurred while get message from queue [" + this.receiver.getQueue() + "]", thr);
                            QueueListenerContainer.this.logger.debug("try recovery after {}ms", (Object)QueueListenerContainer.this.getRecoveryInterval());
                        }
                        this.sleep_(QueueListenerContainer.this.getRecoveryInterval());
                    }
                    continue;
                }
                try {
                    this.receiver.onQueueMessage(message, this.receiver.getQueue());
                }
                catch (Throwable thr) {
                    if (!QueueListenerContainer.this.logger.isWarnEnabled()) continue;
                    QueueListenerContainer.this.logger.warn("exception occurred while receiver consume message.", thr);
                }
            }
            if (QueueListenerContainer.this.logger.isDebugEnabled()) {
                QueueListenerContainer.this.logger.debug("stop monitor:" + this);
            }
            this.safeClose(new boolean[0]);
        }

        T fetchMessage(RedisConnection connection, String queue) {
            List bytes = connection.bLPop(0, (byte[][])new byte[][]{QueueListenerContainer.this.stringRedisSerializer.serialize((Object)queue)});
            if (bytes == null || bytes.isEmpty()) {
                return null;
            }
            return (T)this.receiver.getRedisSerializer().deserialize((byte[])bytes.get(1));
        }

        void safeClose(boolean ... closeNative) {
            if (this.connection != null) {
                try {
                    if (closeNative.length > 0 && closeNative[0]) {
                        ((Jedis)this.connection.getNativeConnection()).disconnect();
                    }
                    this.connection.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.connection = null;
        }

        void sleep_(long time) {
            try {
                Thread.sleep(time);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private static class SimpleQueueListener
    implements IQueueMessageListener {
        private String queue;
        private Object target;
        private Method method;
        private RedisSerializer redisSerializer;
        private Logger logger;

        SimpleQueueListener(String queue, Object target, Method method) {
            this.queue = queue;
            this.target = target;
            this.method = method;
            this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
            this.logger = LoggerFactory.getLogger(target.getClass());
        }

        @Override
        public String getQueue() {
            return this.queue;
        }

        public RedisSerializer getRedisSerializer() {
            return this.redisSerializer;
        }

        public void onQueueMessage(Object message, String queue) {
            block3: {
                try {
                    this.method.invoke(this.target, message, queue);
                }
                catch (Exception e) {
                    Throwable thr = e;
                    while (thr.getCause() != null) {
                        thr = thr.getCause();
                    }
                    if (!this.logger.isErrorEnabled()) break block3;
                    this.logger.error(thr.getMessage(), thr);
                }
            }
        }
    }
}

