/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class ChunkedOperation {

    protected static class SkippingChunkCreator<T, C>
    implements ChunkCreator {
        private final AtomicInteger skipped = new AtomicInteger(1);
        private final Chunks<T, C> chunks;
        private final int skip;

        public SkippingChunkCreator(Chunks<T, C> chunks, int skip) {
            this.chunks = chunks;
            this.skip = skip;
        }

        @Override
        public void onValuePushed() {
            if (this.skipped.decrementAndGet() == 0) {
                this.skipped.set(this.skip);
                this.chunks.createChunk();
            }
        }

        @Override
        public void stop() {
        }
    }

    protected static class TimeBasedChunkCreator<T, C>
    implements ChunkCreator {
        private final SafeObservableSubscription subscription = new SafeObservableSubscription();

        public TimeBasedChunkCreator(final NonOverlappingChunks<T, C> chunks, long time, TimeUnit unit, Scheduler scheduler) {
            this.subscription.wrap(scheduler.schedulePeriodically(new Action0(){

                @Override
                public void call() {
                    chunks.emitAndReplaceChunk();
                }
            }, 0L, time, unit));
        }

        public TimeBasedChunkCreator(final OverlappingChunks<T, C> chunks, long time, TimeUnit unit, Scheduler scheduler) {
            this.subscription.wrap(scheduler.schedulePeriodically(new Action0(){

                @Override
                public void call() {
                    chunks.createChunk();
                }
            }, 0L, time, unit));
        }

        @Override
        public void onValuePushed() {
        }

        @Override
        public void stop() {
            this.subscription.unsubscribe();
        }
    }

    protected static class ObservableBasedMultiChunkCreator<T, C, TOpening, TClosing>
    implements ChunkCreator {
        private final SafeObservableSubscription subscription = new SafeObservableSubscription();

        public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends TOpening> openings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> chunkClosingSelector) {
            this.subscription.wrap(openings.subscribe(new Action1<TOpening>(){

                @Override
                public void call(TOpening opening) {
                    final Chunk chunk = chunks.createChunk();
                    Observable closingObservable = (Observable)chunkClosingSelector.call(opening);
                    closingObservable.subscribe(new Action1<TClosing>(){

                        @Override
                        public void call(TClosing closing) {
                            chunks.emitChunk(chunk);
                        }
                    });
                }
            }));
        }

        @Override
        public void onValuePushed() {
        }

        @Override
        public void stop() {
            this.subscription.unsubscribe();
        }
    }

    protected static class ObservableBasedSingleChunkCreator<T, C, TClosing>
    implements ChunkCreator {
        private final SafeObservableSubscription subscription = new SafeObservableSubscription();
        private final Func0<? extends Observable<? extends TClosing>> chunkClosingSelector;
        private final NonOverlappingChunks<T, C> chunks;

        public ObservableBasedSingleChunkCreator(NonOverlappingChunks<T, C> chunks, Func0<? extends Observable<? extends TClosing>> chunkClosingSelector) {
            this.chunks = chunks;
            this.chunkClosingSelector = chunkClosingSelector;
            chunks.createChunk();
            this.listenForChunkEnd();
        }

        private void listenForChunkEnd() {
            Observable<TClosing> closingObservable = this.chunkClosingSelector.call();
            closingObservable.subscribe(new Action1<TClosing>(){

                @Override
                public void call(TClosing closing) {
                    ObservableBasedSingleChunkCreator.this.chunks.emitAndReplaceChunk();
                    ObservableBasedSingleChunkCreator.this.listenForChunkEnd();
                }
            });
        }

        @Override
        public void onValuePushed() {
        }

        @Override
        public void stop() {
            this.subscription.unsubscribe();
        }
    }

    protected static class SingleChunkCreator<T, C>
    implements ChunkCreator {
        public SingleChunkCreator(Chunks<T, C> chunks) {
            chunks.createChunk();
        }

        @Override
        public void onValuePushed() {
        }

        @Override
        public void stop() {
        }
    }

    protected static class ChunkObserver<T, C>
    implements Observer<T> {
        private final Chunks<T, C> chunks;
        private final Observer<? super C> observer;
        private final ChunkCreator creator;

        public ChunkObserver(Chunks<T, C> chunks, Observer<? super C> observer, ChunkCreator creator) {
            this.observer = observer;
            this.creator = creator;
            this.chunks = chunks;
        }

        @Override
        public void onCompleted() {
            this.creator.stop();
            this.chunks.emitAllChunks();
            this.observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            this.creator.stop();
            this.chunks.emitAllChunks();
            this.observer.onError(e);
        }

        @Override
        public void onNext(T args) {
            this.creator.onValuePushed();
            this.chunks.pushValue(args);
        }
    }

    protected static class Chunks<T, C> {
        private final Queue<Chunk<T, C>> chunks = new ConcurrentLinkedQueue<Chunk<T, C>>();
        private final Observer<? super C> observer;
        private final Func0<? extends Chunk<T, C>> chunkMaker;

        public Chunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
            this.observer = observer;
            this.chunkMaker = chunkMaker;
        }

        public Chunk<T, C> createChunk() {
            Chunk<T, C> chunk = this.chunkMaker.call();
            this.chunks.add(chunk);
            return chunk;
        }

        public void emitAllChunks() {
            Chunk<T, C> chunk;
            while ((chunk = this.chunks.poll()) != null) {
                this.observer.onNext(chunk.getContents());
            }
        }

        public void emitChunk(Chunk<T, C> chunk) {
            if (!this.chunks.remove(chunk)) {
                return;
            }
            this.observer.onNext(chunk.getContents());
        }

        public Chunk<T, C> getChunk() {
            return this.chunks.peek();
        }

        public void pushValue(T value) {
            ArrayList<Chunk<T, C>> copy = new ArrayList<Chunk<T, C>>(this.chunks);
            for (Chunk chunk : copy) {
                chunk.pushValue(value);
            }
        }
    }

    protected static class SizeBasedChunks<T, C>
    extends Chunks<T, C> {
        private final int size;

        public SizeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, int size) {
            super(observer, chunkMaker);
            this.size = size;
        }

        @Override
        public void pushValue(T value) {
            Chunk chunk;
            super.pushValue(value);
            while ((chunk = this.getChunk()) != null && chunk.size() >= this.size) {
                this.emitChunk(chunk);
            }
        }
    }

    protected static class TimeBasedChunks<T, C>
    extends OverlappingChunks<T, C>
    implements Subscription {
        private final ConcurrentMap<Chunk<T, C>, Subscription> subscriptions = new ConcurrentHashMap<Chunk<T, C>, Subscription>();
        private final Scheduler scheduler;
        private final long time;
        private final TimeUnit unit;

        public TimeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, long time, TimeUnit unit, Scheduler scheduler) {
            super(observer, chunkMaker);
            this.time = time;
            this.unit = unit;
            this.scheduler = scheduler;
        }

        @Override
        public Chunk<T, C> createChunk() {
            final Chunk chunk = super.createChunk();
            this.subscriptions.put(chunk, this.scheduler.schedule(new Action0(){

                @Override
                public void call() {
                    TimeBasedChunks.this.emitChunk(chunk);
                }
            }, this.time, this.unit));
            return chunk;
        }

        @Override
        public void emitChunk(Chunk<T, C> chunk) {
            this.subscriptions.remove(chunk);
            super.emitChunk(chunk);
        }

        @Override
        public void unsubscribe() {
            for (Subscription s : this.subscriptions.values()) {
                s.unsubscribe();
            }
        }
    }

    protected static class TimeAndSizeBasedChunks<T, C>
    extends Chunks<T, C>
    implements Subscription {
        private final ConcurrentMap<Chunk<T, C>, Subscription> subscriptions = new ConcurrentHashMap<Chunk<T, C>, Subscription>();
        private final Scheduler scheduler;
        private final long maxTime;
        private final TimeUnit unit;
        private final int maxSize;

        public TimeAndSizeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) {
            super(observer, chunkMaker);
            this.maxSize = maxSize;
            this.maxTime = maxTime;
            this.unit = unit;
            this.scheduler = scheduler;
        }

        @Override
        public Chunk<T, C> createChunk() {
            final Chunk chunk = super.createChunk();
            this.subscriptions.put(chunk, this.scheduler.schedule(new Action0(){

                @Override
                public void call() {
                    TimeAndSizeBasedChunks.this.emitChunk(chunk);
                }
            }, this.maxTime, this.unit));
            return chunk;
        }

        @Override
        public void emitChunk(Chunk<T, C> chunk) {
            Subscription subscription = (Subscription)this.subscriptions.remove(chunk);
            if (subscription == null) {
                return;
            }
            super.emitChunk(chunk);
            subscription.unsubscribe();
            this.createChunk();
        }

        @Override
        public void pushValue(T value) {
            Chunk chunk;
            super.pushValue(value);
            while ((chunk = this.getChunk()) != null && chunk.size() >= this.maxSize) {
                this.emitChunk(chunk);
            }
        }

        @Override
        public void unsubscribe() {
            for (Subscription s : this.subscriptions.values()) {
                s.unsubscribe();
            }
        }
    }

    protected static class OverlappingChunks<T, C>
    extends Chunks<T, C> {
        public OverlappingChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
            super(observer, chunkMaker);
        }
    }

    protected static class NonOverlappingChunks<T, C>
    extends Chunks<T, C> {
        private final Object lock = new Object();

        public NonOverlappingChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
            super(observer, chunkMaker);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Chunk<T, C> emitAndReplaceChunk() {
            Object object = this.lock;
            synchronized (object) {
                this.emitChunk(this.getChunk());
                return this.createChunk();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void pushValue(T value) {
            Object object = this.lock;
            synchronized (object) {
                super.pushValue(value);
            }
        }
    }

    protected static abstract class Chunk<T, C> {
        protected final List<T> contents = new ArrayList<T>();

        protected Chunk() {
        }

        public void pushValue(T value) {
            this.contents.add(value);
        }

        public abstract C getContents();

        public int size() {
            return this.contents.size();
        }
    }

    protected static interface ChunkCreator {
        public void onValuePushed();

        public void stop();
    }
}

