/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.CurrentThreadScheduler;
import rx.schedulers.ImmediateScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;

public class OperationObserveOn {
    public static <T> Observable.OnSubscribeFunc<T> observeOn(Observable<? extends T> source, Scheduler scheduler) {
        return new ObserveOn<T>(source, scheduler);
    }

    private static class ObserveOn<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> source;
        private final Scheduler scheduler;

        public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            if (this.scheduler instanceof ImmediateScheduler) {
                return this.source.subscribe(observer);
            }
            if (this.scheduler instanceof CurrentThreadScheduler) {
                return this.source.subscribe(observer);
            }
            return new Observation(observer).init();
        }

        private class Observation {
            final Observer<? super T> observer;
            final CompositeSubscription compositeSubscription = new CompositeSubscription(new Subscription[0]);
            final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
            final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue();
            final AtomicLong counter = new AtomicLong(0L);
            private volatile Scheduler recursiveScheduler;

            public Observation(Observer<? super T> observer) {
                this.observer = observer;
            }

            public Subscription init() {
                this.compositeSubscription.add(ObserveOn.this.source.materialize().subscribe(new SourceObserver()));
                return this.compositeSubscription;
            }

            private class SourceObserver
            implements Action1<Notification<? extends T>> {
                private SourceObserver() {
                }

                @Override
                public void call(Notification<? extends T> e) {
                    Observation.this.queue.offer(e);
                    if (Observation.this.counter.getAndIncrement() == 0L) {
                        if (Observation.this.recursiveScheduler == null) {
                            Observation.this.compositeSubscription.add(ObserveOn.this.scheduler.schedule(null, new Func2<Scheduler, T, Subscription>(){

                                @Override
                                public Subscription call(Scheduler innerScheduler, T state) {
                                    Observation.this.recursiveScheduler = innerScheduler;
                                    SourceObserver.this.processQueue();
                                    return Observation.this.recursiveSubscription;
                                }
                            }));
                        } else {
                            this.processQueue();
                        }
                    }
                }

                void processQueue() {
                    Observation.this.recursiveSubscription.set(Observation.this.recursiveScheduler.schedule(new Action1<Action0>(){

                        @Override
                        public void call(Action0 self) {
                            Notification not = Observation.this.queue.poll();
                            if (not != null) {
                                not.accept(Observation.this.observer);
                            }
                            if (Observation.this.counter.decrementAndGet() > 0L) {
                                self.call();
                            }
                        }
                    }));
                }
            }
        }
    }
}

