/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.operators.ChunkedOperation;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public final class OperationBuffer
extends ChunkedOperation {
    private static <T> Func0<Buffer<T>> bufferMaker() {
        return new Func0<Buffer<T>>(){

            @Override
            public Buffer<T> call() {
                return new Buffer();
            }
        };
    }

    public static <T, TClosing> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.NonOverlappingChunks buffers = new ChunkedOperation.NonOverlappingChunks(observer, OperationBuffer.bufferMaker());
                ChunkedOperation.ObservableBasedSingleChunkCreator creator = new ChunkedOperation.ObservableBasedSingleChunkCreator(buffers, bufferClosingSelector);
                return new CompositeSubscription(new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(buffers, observer, creator)));
            }
        };
    }

    public static <T, TOpening, TClosing> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final Observable<? extends TOpening> bufferOpenings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.OverlappingChunks buffers = new ChunkedOperation.OverlappingChunks(observer, OperationBuffer.bufferMaker());
                ChunkedOperation.ObservableBasedMultiChunkCreator creator = new ChunkedOperation.ObservableBasedMultiChunkCreator(buffers, bufferOpenings, bufferClosingSelector);
                return new CompositeSubscription(new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(buffers, observer, creator)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(Observable<T> source, int count) {
        return OperationBuffer.buffer(source, count, count);
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final int count, final int skip) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.SizeBasedChunks chunks = new ChunkedOperation.SizeBasedChunks(observer, OperationBuffer.bufferMaker(), count);
                ChunkedOperation.SkippingChunkCreator creator = new ChunkedOperation.SkippingChunkCreator(chunks, skip);
                return new CompositeSubscription(new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(chunks, observer, creator)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit) {
        return OperationBuffer.buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.NonOverlappingChunks buffers = new ChunkedOperation.NonOverlappingChunks(observer, OperationBuffer.bufferMaker());
                ChunkedOperation.TimeBasedChunkCreator creator = new ChunkedOperation.TimeBasedChunkCreator(buffers, timespan, unit, scheduler);
                return new CompositeSubscription(new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(buffers, observer, creator)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
        return OperationBuffer.buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.TimeAndSizeBasedChunks chunks = new ChunkedOperation.TimeAndSizeBasedChunks(observer, OperationBuffer.bufferMaker(), count, timespan, unit, scheduler);
                ChunkedOperation.SingleChunkCreator creator = new ChunkedOperation.SingleChunkCreator(chunks);
                return new CompositeSubscription(chunks, new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(chunks, observer, creator)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
        return OperationBuffer.buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<List<T>>(){

            @Override
            public Subscription onSubscribe(Observer<? super List<T>> observer) {
                ChunkedOperation.TimeBasedChunks buffers = new ChunkedOperation.TimeBasedChunks(observer, OperationBuffer.bufferMaker(), timespan, unit, scheduler);
                ChunkedOperation.TimeBasedChunkCreator creator = new ChunkedOperation.TimeBasedChunkCreator(buffers, timeshift, unit, scheduler);
                return new CompositeSubscription(buffers, new ChunkToSubscription(creator), source.subscribe(new ChunkedOperation.ChunkObserver(buffers, observer, creator)));
            }
        };
    }

    public static <T, B> Observable.OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary) {
        return new BufferWithObservableBoundary<T, B>(source, boundary, 16);
    }

    public static <T, B> Observable.OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
        if (initialCapacity <= 0) {
            throw new IllegalArgumentException("initialCapacity > 0 required");
        }
        return new BufferWithObservableBoundary<T, B>(source, boundary, initialCapacity);
    }

    private static final class BufferWithObservableBoundary<T, B>
    implements Observable.OnSubscribeFunc<List<T>> {
        final Observable<? extends T> source;
        final Observable<B> boundary;
        final int initialCapacity;

        public BufferWithObservableBoundary(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
            this.source = source;
            this.boundary = boundary;
            this.initialCapacity = initialCapacity;
        }

        @Override
        public Subscription onSubscribe(Observer<? super List<T>> t1) {
            CompositeSubscription csub = new CompositeSubscription(new Subscription[0]);
            SourceObserver so = new SourceObserver(t1, this.initialCapacity, csub);
            csub.add(this.source.subscribe(so));
            csub.add(this.boundary.subscribe(new BoundaryObserver(so)));
            return csub;
        }

        private static final class BoundaryObserver<T>
        implements Observer<T> {
            final SourceObserver so;

            public BoundaryObserver(SourceObserver so) {
                this.so = so;
            }

            @Override
            public void onNext(T args) {
                this.so.emitAndReplace();
            }

            @Override
            public void onError(Throwable e) {
                this.so.onError(e);
            }

            @Override
            public void onCompleted() {
                this.so.onCompleted();
            }
        }

        private static final class SourceObserver<T>
        implements Observer<T> {
            final Observer<? super List<T>> observer;
            List<T> buffer;
            final int initialCapacity;
            final Object guard;
            final Subscription cancel;

            public SourceObserver(Observer<? super List<T>> observer, int initialCapacity, Subscription cancel) {
                this.observer = observer;
                this.initialCapacity = initialCapacity;
                this.guard = new Object();
                this.cancel = cancel;
                this.buffer = new ArrayList<T>(initialCapacity);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(T args) {
                Object object = this.guard;
                synchronized (object) {
                    this.buffer.add(args);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onError(Throwable e) {
                Object object = this.guard;
                synchronized (object) {
                    if (this.buffer == null) {
                        return;
                    }
                    this.buffer = null;
                }
                this.observer.onError(e);
                this.cancel.unsubscribe();
            }

            @Override
            public void onCompleted() {
                this.emitAndComplete();
                this.cancel.unsubscribe();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            void emitAndReplace() {
                List<T> buf;
                Object object = this.guard;
                synchronized (object) {
                    if (this.buffer == null) {
                        return;
                    }
                    buf = this.buffer;
                    this.buffer = new ArrayList<T>(this.initialCapacity);
                }
                this.observer.onNext(buf);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            void emitAndComplete() {
                List<T> buf;
                Object object = this.guard;
                synchronized (object) {
                    if (this.buffer == null) {
                        return;
                    }
                    buf = this.buffer;
                    this.buffer = null;
                }
                this.observer.onNext(buf);
                this.observer.onCompleted();
            }
        }
    }

    private static class ChunkToSubscription
    implements Subscription {
        private ChunkedOperation.ChunkCreator cc;
        private final AtomicBoolean done;

        public ChunkToSubscription(ChunkedOperation.ChunkCreator cc) {
            this.cc = cc;
            this.done = new AtomicBoolean();
        }

        @Override
        public void unsubscribe() {
            if (this.done.compareAndSet(false, true)) {
                ChunkedOperation.ChunkCreator cc0 = this.cc;
                this.cc = null;
                cc0.stop();
            }
        }
    }

    protected static class Buffer<T>
    extends ChunkedOperation.Chunk<T, List<T>> {
        protected Buffer() {
        }

        @Override
        public List<T> getContents() {
            return this.contents;
        }
    }
}

