package org.springframework.cloud.netflix.turbine.stream;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

@EnableConfigurationProperties({TurbineStreamProperties.class})
@Configuration
/* loaded from: input_file:org/springframework/cloud/netflix/turbine/stream/TurbineStreamConfiguration.class */
public class TurbineStreamConfiguration implements SmartLifecycle {
    private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
    private static final String CLUSTER_PARAM = "cluster";
    private static final String DEFAULT_CLUSTER = "default";
    private static final String INSTANCE_ID_KEY = "instanceId";
    private AtomicBoolean running = new AtomicBoolean(false);

    @Autowired
    private TurbineStreamProperties properties;
    private int turbinePort;

    @Bean
    public HasFeatures Feature() {
        return HasFeatures.namedFeature("Turbine (Stream)", TurbineStreamProperties.class);
    }

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        Observable merge = Observable.merge(StreamAggregator.aggregateGroupedStreams(hystrixSubject().groupBy(map -> {
            return InstanceKey.create((String) map.get(INSTANCE_ID_KEY));
        })).doOnUnsubscribe(() -> {
            log.info("Unsubscribing aggregation.");
        }).doOnSubscribe(() -> {
            log.info("Starting aggregation");
        }).flatMap(groupedObservable -> {
            return groupedObservable;
        }).publish().refCount(), Observable.timer(1L, 10L, TimeUnit.SECONDS).map(l -> {
            return Collections.singletonMap("type", "Ping");
        }).publish().refCount());
        this.turbinePort = this.properties.getPort();
        if (this.turbinePort <= 0) {
            this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
        }
        return RxNetty.createHttpServer(this.turbinePort, (httpServerRequest, httpServerResponse) -> {
            log.info("SSE Request Received");
            httpServerResponse.getHeaders().setHeader("Content-Type", "text/event-stream");
            return merge.doOnUnsubscribe(() -> {
                log.info("Unsubscribing RxNetty server connection");
            }).filter(createClusterPredicate(httpServerRequest.getQueryParameters())).flatMap(map2 -> {
                return httpServerResponse.writeAndFlush(new ServerSentEvent((ByteBuf) null, Unpooled.copiedBuffer("message", StandardCharsets.UTF_8), Unpooled.copiedBuffer(JsonUtility.mapToJson(map2) + "\n", StandardCharsets.UTF_8)));
            });
        }, PipelineConfigurators.serveSseConfigurator());
    }

    Func1<Map<String, Object>, Boolean> createClusterPredicate(Map<String, List<String>> map) {
        List<String> list = map.get(CLUSTER_PARAM);
        return (list == null || list.isEmpty() || list.contains(DEFAULT_CLUSTER)) ? map2 -> {
            return true;
        } : map3 -> {
            String str = (String) map3.get(INSTANCE_ID_KEY);
            if (str == null) {
                return true;
            }
            return Boolean.valueOf(list.stream().anyMatch(str2 -> {
                return str.toLowerCase().startsWith(str2.toLowerCase() + ":");
            }));
        };
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            aggregatorServer().start();
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            try {
                aggregatorServer().shutdown();
            } catch (InterruptedException e) {
                log.error("Error shutting down", e);
            }
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public int getPhase() {
        return 0;
    }

    public int getTurbinePort() {
        return this.turbinePort;
    }
}
