본문

[2017.12.10] 84. RxJava - Observable 중지 시키기

도입

이번 포스팅에서는 RxJava에서 Observable을 구독 중에 구독을 중지하는 방법을 실습 할 예정이다.

프로젝트 안에서 Observable을 통해 주기적으로 값을 변경하는 로직이 존재했고 (마치 Timer 처럼) onPause가 되었을 때 Observable을 중지해야했다.

RxJava는 유용하지만 해당 스트림을 취소하지 않으면 계속 돌기 때문에 많은 리소스를 사용하게 된다.


실습

아래 코드와 같이 1초마다 값을 변경하는 Observable이 존재

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 1초마다 값을 변경하는 Observable
private Observable<Float> makeDurationSubscr() {
    Observable mDurationSubscr = Observable.create(new ObservableOnSubscribe<Float>() {
        @Override
        public void subscribe(ObservableEmitter<Float> e) throws Exception {
            try {
                for (int i = 1; i < maxDuration+1; i = i + 1) {
                    e.onNext((i*(100/Float.parseFloat(maxDuration+""))));
                    Thread.sleep(1000);
                }
            e.onComplete();
            } catch (Exception ex) {
    
            }
        }
    });
    return mDurationSubscr;
}
 
// 해당 Observable을 실행하는 메소드
private void startDurationTimer() {
    makeDurationSubscr()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            curProgress -> {
                Log.d(TAG, "startDurationTimer: " + curProgress);
                curDuration.set(curProgress);
            }
        );
}
cs


해결 방법

찾아보니 CompositeDisposable 객체에 등록해 관리하므로 Observable을 관리할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Dispose 가능한 객체 선언
private Disposable durationTimer;
 
// Dispose 가능한 객체를 담아 놓을 수 있는 Container 선언
private CompositeDisposable observableDisposal;
 
// 1초마다 값을 변경하는 Observable
private Observable<Float> makeDurationSubscr() {
    Observable mDurationSubscr = Observable.create(new ObservableOnSubscribe<Float>() {
        @Override
        public void subscribe(ObservableEmitter<Float> e) throws Exception {
            try {
                for (int i = 1; i < maxDuration+1; i = i + 1) {
                    e.onNext((i*(100/Float.parseFloat(maxDuration+""))));
                    Thread.sleep(1000);
                }
            e.onComplete();
            } catch (Exception ex) {
    
            }
        }
    });
    return mDurationSubscr;
}
 
// 해당 Observable을 실행하는 메소드
private void startDurationTimer() {
    durationTimer =
        makeDurationSubscr()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                curProgress -> {
                    Log.d(TAG, "startDurationTimer: " + curProgress);
                    curDuration.set(curProgress);
                }
            );
    // Container에 등록
    observableDisposal.add(durationTimer);
}
 
public void onPause() {
    // Dispose 객체 Clear
    observableDisposal.clear();
}
cs



#RxJava #CompositeDisposable

공유

댓글