private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
observable.blockingForEach(new Consumer<GroupedObservable<K, V>>() {
@Override
public void accept(final GroupedObservable<K, V> o) {
result.put(o.getKey(), new ConcurrentLinkedQueue<V>());
o.subscribe(new Consumer<V>() {
@Override
public void accept(V v) {
result.get(o.getKey()).add(v);
return result;
@Test(expected = TestException.class) public void blockingForEachThrows() { Observable.just(1) .blockingForEach(new Consumer<Integer>() { @Override public void accept(Integer e) throws Exception { throw new TestException();
private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {
final List<List<T>> lists = new ArrayList<List<T>>();
Observable.concat(observables.map(new Function<Observable<T>, Observable<List<T>>>() {
@Override
public Observable<List<T>> apply(Observable<T> xs) {
return xs.toList().toObservable();
.blockingForEach(new Consumer<List<T>>() {
@Override
public void accept(List<T> xs) {
lists.add(xs);
return lists;
@Override public Integer apply(Integer v) throws Exception { Observable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer()); return v;
/**
* This won't compile if super/extends isn't done correctly on generics.
@Test
public void testCovarianceOfZip() {
Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
/**
* This won't compile if super/extends isn't done correctly on generics.
@Test
public void testCovarianceOfCombineLatest() {
Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
/**
* Confirm that running on a NewThreadScheduler uses the same thread for the entire stream.
@Test
public void testObserveOnWithNewThreadScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;
Observable.range(1, 100000).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t1) {
return t1 * _multiple;
}).observeOn(Schedulers.newThread())
.blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
// FIXME toBlocking methods run on the current thread
String name = Thread.currentThread().getName();
assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
/**
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
@Test
public void testObserveOnWithThreadPoolScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;
Observable.range(1, 100000).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t1) {
return t1 * _multiple;
}).observeOn(Schedulers.computation())
.blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
// FIXME toBlocking methods run on the caller's thread
String name = Thread.currentThread().getName();
assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));
@Test public void testWindow() { final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>(); Observable.concat( Observable.just(1, 2, 3, 4, 5, 6) .window(3) .map(new Function<Observable<Integer>, Observable<List<Integer>>>() { @Override public Observable<List<Integer>> apply(Observable<Integer> xs) { return xs.toList().toObservable(); .blockingForEach(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> xs) { lists.add(xs); assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 }); assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 }); assertEquals(2, lists.size());
@Test(timeout = 2000) public void testMultiTake() { final AtomicInteger count = new AtomicInteger(); Observable.unsafeCreate(new ObservableSource<Integer>() { @Override public void subscribe(Observer<? super Integer> observer) { Disposable bs = Disposables.empty(); observer.onSubscribe(bs); for (int i = 0; !bs.isDisposed(); i++) { System.out.println("Emit: " + i); count.incrementAndGet(); observer.onNext(i); }).take(100).take(1).blockingForEach(new Consumer<Integer>() { @Override public void accept(Integer t1) { System.out.println("Receive: " + t1); assertEquals(1, count.get());
@Test(timeout = 5000) public void toObservableNormal() { normal.completable.toObservable().blockingForEach(Functions.emptyConsumer());
@Test(timeout = 5000, expected = TestException.class) public void toObservableError() { error.completable.toObservable().blockingForEach(Functions.emptyConsumer());
@Test public void testUnsubscribeScan() throws Exception { ObservableEventStream.getEventStream("HTTP-ClusterB", 20) .scan(new HashMap<String, String>(), new BiFunction<HashMap<String, String>, Event, HashMap<String, String>>() { @Override public HashMap<String, String> apply(HashMap<String, String> accum, Event perInstanceEvent) { accum.put("instance", perInstanceEvent.instanceId); return accum; .take(10) .blockingForEach(new Consumer<HashMap<String, String>>() { @Override public void accept(HashMap<String, String> pv) { System.out.println(pv); Thread.sleep(200); // make sure the event streams receive their interrupt
Observable.merge(source).take(6).blockingForEach(new Consumer<Long>() {
@Test public void testTakeUnsubscribesOnGroupBy() throws Exception { Observable.merge( ObservableEventStream.getEventStream("HTTP-ClusterA", 50), ObservableEventStream.getEventStream("HTTP-ClusterB", 20) // group by type (2 clusters) .groupBy(new Function<Event, String>() { @Override public String apply(Event event) { return event.type; .take(1) .blockingForEach(new Consumer<GroupedObservable<String, Event>>() { @Override public void accept(GroupedObservable<String, Event> v) { System.out.println(v); v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream System.out.println("**** finished"); Thread.sleep(200); // make sure the event streams receive their interrupt