RxJava Observable
Observable class 는 Observer 디자인 패턴을 구현한다. 옵저버 패턴은 객체의 상태 변화를 관찰자 목록에 객체를 등록하고, 상태 변화 발생할때마다 함수를 호출하여 옵저버의 상태 변경에 대한 알림을 전달한다.
Observable 종류
Observable: 가장 기본적인 형태로서, 0 ~ N 개의 데이터 발행 (BackPressure 없음)Single: 단 1개의 데이터 발행 및 구독 처리Maybe: 0개 or 1개 데이터 발행 및 구독,successorfailorcomplete결과 처리Completable:successorfail결과만 발행Flowable: 0 ~ N 개 데이터 발행 (BackPressure 있음)
Hot Observable vs Cold Observable
Hot Observable: 구독자의 존재 여부와 상관없이 데이터를 발행하는Observable- 다수의 구독자를 고려 가능하나, 구독자가
Observable의 데이터를 처음부터 구독했는지 보장 불가 - 시스템 이벤트, 센서 데이터, 주식 가격 등
- 다수의 구독자를 고려 가능하나, 구독자가
Cold Observable: 구독자가 구독을 하는 순간 데이터를 발행하는Observable- 구독자가 구독을 하면, 준비된 데이터를 처음부터 발 처리
- 웹 요청, 데이터베이스 조회, 파일 읽기 등
Hot Observable 를 Cold Observable 로 변환해주는 Subject class
Subject Class 는 Hot Observable 에서 Cold Observable 로 변환해줄 수 있으며, 발행자와 구독자의 속성을 모두 가지고 있다.
RxJava 의 주요 Subject Class
AsyncSubject
Observable에서 발행한 마지막 데이터 구독onComplete()함수가 호출되는 순간 마지막 데이터 발행 및 구독 처리
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(data -> System.out.println("subscribe #1 : " + data));
subject.onNext("one");
subject.onNext("two");
subject.subscribe(data -> System.out.println("subscribe #2 : " + data));
subject.onNext("three");
subject.onComplete();
// subscribe #1 : three
// subscribe #2 : threeBehaviorSubject
- 구독하는 순간, 가장 최근 데이터 혹은 기본값 데이터를 구독

BehaviorSubject<String> subject = BehaviorSubject.createDefault("BLUE");
subject.subscribe(data -> System.out.println("subscribe #1 : " + data));
subject.onNext("RED");
subject.onNext("GREEN");
subject.subscribe(data -> System.out.println("subscribe #2 : " + data));
subject.onNext("WHITE");
subject.onComplete();
// subscribe #1 : BLUE
// subscribe #1 : RED
// subscribe #1 : GREEN
// subscribe #2 : GREEN
// subscribe #1 : WHITE
// subscribe #2 : WHITEPublishSubject
- 가장 평범한
SubjectClass 로서, 오직 구독자만 구독할 경우에만 데이터를 발행
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(data -> System.out.println("subscribe #1 : " + data));
subject.onNext("one");
subject.onNext("two");
subject.subscribe(data -> System.out.println("subscribe #2 : " + data));
subject.onNext("three");
subject.onComplete();
// subscribe #1 : one
// subscribe #1 : two
// subscribe #1 : three
// subscribe #2 : threeReplaySubject
- 새로운 구독자가 생기면 데이터의 처음부터 끝까지 다시 발행 (메모리 누수 위험)

ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data -> System.out.println("subscribe #1 : " + data));
subject.onNext("one");
subject.onNext("two");
subject.subscribe(data -> System.out.println("subscribe #2 : " + data));
subject.onNext("three");
subject.subscribe(data -> System.out.println("subscribe #3 : " + data));
subject.onComplete();
// subscribe #1 : one
// subscribe #1 : two
// subscribe #2 : one
// subscribe #2 : two
// subscribe #1 : three
// subscribe #2 : three
// subscribe #3 : one
// subscribe #3 : two
// subscribe #3 : threeConnectableObservable
Observable에서 데이터를 발행하더라도 데이터 발행을 유예하고 특정 시점(connect() 함수 호출 시점)에 발행 가능
String[] arr = {"one", "two", "three"};
Observable<String> observable = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(i -> arr[i])
.take(arr.length);
ConnectableObservable<String> source = observable.publish();
source.subscribe(data -> System.out.println("subscribe #1 : " + data));
source.subscribe(data -> System.out.println("subscribe #2 : " + data));
source.connect();
Thread.sleep(200);
source.subscribe(data -> System.out.println("subscribe #3 : " + data));
Thread.sleep(100);
// subscribe #1 : one
// subscribe #2 : one
// subscribe #1 : two
// subscribe #2 : two
// subscribe #1 : three
// subscribe #2 : three
// subscribe #3 : threesubscribeOn() vs observeOn()
subscribeOn():Observable객체 실행Thread지정observeOn(): 연쇄적인 연산 실행Thread지정
List<Figure> figures = Arrays.asList(
new Figure("Red", "Ball"),
new Figure("Green", "Ball"),
new Figure("Blue", "Ball")
);
Observable.fromIterable(figures)
.subscribeOn(Schedulers.computation()) // 내부적으로 Thread pool 생성
.subscribeOn(Schedulers.io()) // 필요할때마다 Thread 생성
.doOnSubscribe(data -> PrintUtil.printData("doOnSubscribe"))
.doOnNext(data -> PrintUtil.printData("doOnNext [init] : ", data))
.observeOn(Schedulers.newThread()) // 매번 새로운 Thread 생성
.map(data -> {data.shape = "Square"; return data;})
.doOnNext(data -> PrintUtil.printData("doOnNext [Ball -> Square] : ", data))
.observeOn(Schedulers.newThread())
.map(data -> {data.shape = "Triangle"; return data;})
.doOnNext(data -> PrintUtil.printData("doOnNext [Square -> Triangle] : ", data))
.observeOn(Schedulers.newThread())
.subscribe(data -> System.out.println("subscribe : " + data));class Figure {
String color;
String shape;
public Figure(String color, String shape) {
this.color = color;
this.shape = shape;
}
}
class PrintUtil {
static void printData(String message) {
System.out.println("" + Thread.currentThread().getName() + " | " + message);
}
static void printData(String message, Object obj) {
System.out.println("" + Thread.currentThread().getName() + " | " + message + " | " + obj.toString());
}
}Flowable
BackPressure를 제어할 수 있는Observable
BackPressure
Observable에서 데이터를 발행하는 속도를Observer의 구독하는 속도가 따라가지 못하는 현상 (BackPressure또는배압)- 메모리가 overflow 가 되면서
OutOfMemoryError발생 가능
BackPressure 방지를 위한 Flowable
Observable 의 BackPressure
Observable는 배압 제어를 못하기 때문에 10000 건 모두 발행한다.
private static void backPressure() throws Exception {
PrintUtil.printData("backPressure()");
Observable.range(1, 10000)
.doOnNext(i -> PrintUtil.printData("doOnNext", i))
.observeOn(Schedulers.io())
.subscribe(i -> {
PrintUtil.printData("subscribe", i);
Thread.sleep(100);
});
Thread.sleep(10000);
}Flowable 의 BackPressure 제어
Flowable는 일정량 발행이 되면, 발행을 제어하고 구독 처리된다.
private static void noBackPressure() throws Exception {
PrintUtil.printData("noBackPressure()");
Flowable.range(1, 10000)
.doOnNext(i -> PrintUtil.printData("doOnNext", i))
.observeOn(Schedulers.io())
.subscribe(i -> {
PrintUtil.printData("subscribe", i);
Thread.sleep(100);
});
Thread.sleep(10000);
}Observable vs Flowable
- When to use
Observable? 10,000개 미만의 이벤트 데이터 흐름이 발생하는 경우 - When to use
Flowable? 10,000개 이상의 데이터 흐름이 발생하는 경우- 디스크에서 파일 읽는 경우
- DB 에서 대용량 데이터 읽는 경우
- 네트워크 I/O 실행하는 경우
BackPressure Strategy
Flowable구현일지라도 배압 제어를 통해MissingBackpressureException을 방지 필수
| Field | 내용 |
|---|---|
| MISSION | 배압 전략 없음 |
| ERROR | 배압 현상 발생시 MissingBackpressureException 발생 |
| BUFFER | 데이터를 소비할 때까지 데이터를 Buffer 에 저장. 하지만 OutOfMemoryError 발생 가능 |
| DROP | 배압 현상이 발생한 경우 발행되는 데이터 모두 버림 |
| LATEST | 구독자가 새로운 데이터 구독 준비될 때까지 최신 데이터만 유지하고 나머지는 버림 |
Flowable BackPressure Strategy 3가지 연산자
onBackPressureBuffer()onBackPressureDrop()onBackPressure=Latest()
출처
- 길은 가면, 뒤에 있다. - [RxJava] RxJava 프로그래밍(1) - 리액티브 프로그래밍
- HERSTORY [RxJava] RxJava 이해하기 - 7. Backpressure와 Flowable