본문 바로가기

프로그래밍 서적/RxJava 프로그래밍

2. Observable 처음 만들기

반응형

Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 함.(RxJava에서 정말 중요한 개념)

Observable class와 그의 파생 클래스에 대해 알아보는 챕터(Observable, Maybe, Flowable class)


2.1 Observable class

옵서버(Observer) 패턴을 구현하며 이는 객체의 상태 변화를 관찰하는 관찰자(옵서버) 목록을 객체에 등록합니다. 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵서버에게 변화를 알려줍니다.

Observable은 "현재는 관찰되지 않지만 이론을 통해서 앞으로 관찰할 가능성을 의미"한다.

버튼 클릭 시 원하는 처리를 하는 것 역시 옵서버 패턴의 대표적인 예

RxJava의 Observable은 세 가지의 알림을 구독자에게 전달


1. onNext : Observable이 데이터의 발행을 알림. 기존의 옵서버 패턴과 같음

2. onComplete : 모든 데이터의 발행을 완료했음을 알림. onComplete 이벤트는 단 한 번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해선 안 됩니다.

3. onError : Observable에서 어떤 이유로 에러가 발생했음을 알림. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않습니다. 즉, Observable의 실행을 종료.


Observable을 생성할 때는 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출. 



2.1.1 just() 함수

데이터를 발행하는 가장 쉬운 방법은 기존의 자료구조를 사용하는 것입니다. 

just()함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성(실제 데이터 발행은 subscribe()함수를 호출해야 시작).

인자는 한 개에서 최대 10개까지 넣을 수 있음. 단 타입은 모두 같아야 합니다.



2.1.2 subscribe() 함수와 Disposable 객체

RxJava는 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할 수 있습니다. 이때 사용하는 것이 subscribe() 함수

Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행.



NOTE_ RxJava는 선언형 프로그래밍

Rxjava는 선언형 프로그래밍을 지향합니다. 이는, 명령형 프로그래밍(imperative programming)의 반대말로 어떤 방법(how)으로 동작하는지가 아니라 

프로그래밍할 대상이 무엇(what)인지 알려주는 것을 의미. 예를 들어 명령형 프로그래밍 언어에서는 실행할 알고리즘과 동작을 구체적으로 선언형 프로그래밍은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않습니다.



subscribe 원형은 책 참고---


subscribe() 함수 원형은 모두 Disposable 인터페이스의 객체를 리턴.

Disposable은 Subscription(구독) 객체에 해당(2개의 함수만 존재(void dispose(), boolean isDisposed()))


NOTE_ Disposable 인터페이스의 함수

dispose()는 Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수.

Observable이 onComplete 알림을 보냈을 때, 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊습니다.

즉, 정상적으로 onComplete 발생 시 dispose() 함수를 따로 호출할 필요 없음

isDisposed() 함수는 Observable이 데이터를 발행하지 않는지(구독을 해지했는지) 확인하는 함수.(코드 2-2 참고)



2.1.3 create() 함수

데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하는 just() 함수와 달리, onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 하는 함수.

       

     

NOTE_ 람다 표현식 활용

create()함수의 인자는 원래 ObservableOnSubscribe 인터페이스 타입이어야 하지만, 코드 2-3에서는 ObservableEmitter 인터페이스 객체를 

인자로 받는 람다 표현식으로 처리. 람다표현식을 활용하면 Observable.create()를 호출할 때 불필요한 익명 객체나 멤버 변수를 기재하지 않고 

꼭 필요한 변수만 소스 코드에 작성하면 되므로 소스 코드의 가독성이 높아짐.



람다 표현식과 메서드 레퍼런스를 사용할 때는 다음 우선순위를 고려해서 사용여부를 판단하기를 권함.

1. 메서드 레퍼런스로 축약할 수 있는지 확인

2. 그 다음 람다 표현식을 활용할 수 있는지 확인.

3. 1~2를 활용할 수 없으면 익명 객체나 멤버 변수로 표현.


NOTE_ Observable.create()를 사용할 때는 주의해야 합니다.

RxJava에 익숙한 사용자만 create()를 사용하도록 권고함. 꼭 사용해야하는 사항은 아래와 같습니다.

1. Observable이 구독해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 합니다. 그렇지 않으면 잠재적으로 메모리 누수(memory leak)가 발생합니다.

2. 구독자가 구독하는 동안에만 onNext와 onComplete 이벤트를 호출해야 합니다.

3. 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야 합니다.

4. 배압(back pressure)을 직접 처리해야 합니다.



NOTE_ RxJava 2의 Consumer<T>와 자바 8의 Consumer<T>

자바 8에서 제공하는 함수형 인터페이스 :

java.util.function.Consumer / Predicate / Function

RxJava 2에서 제공하는 같은 기능을 하는 함수형 인터페이스 :

io.reactivex.functions.Consumer / Predicate / Function



2.1.4 fromArray() 함수

단일 데이터가 아닐 때 사용하는 함수 : fromXXX() 계열 함수

fromArray() 함수는 코드 2-7처럼 숫자 뿐만 아니라 사용자 정의 클래스 객체도 넣을 수 있다.



NOTE_ int[] 배열은 어떻게 처리하지?

명시적 래퍼 타입인 Integer[]이 아닌, 실제 코드 작성 시 사용하는 int[] 사용 시, 원하는 결과 값이 나오지 않습니다.

RxJava에서 int 배열을 Integer[]로 변환해야 인식하며, 다음과 같은 방법을 이용합니다.

private static Integer[] toIntegerArray(int[] intArray) {

return IntStream.of(intArray).boxed().toArray(Integer[]::new);

}

위 함수를 통해 int[] 를 Integer[] 로 변환시켜 사용해야 합니다.



2.1.5 fromIterable() 함수

Observable을 만드는 다른 방법은 Iterable 인터페이스를 구현한 클래스에서 Observable 객체를 생성하는 것입니다.

Iterable 인터페이스는 박복자(iterator)를 반환

Iterator 인터페이스는 이터레이터 패턴을 구현한 것으로 다음에 어떤 데이터(아이템)가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는 장점이 있습니다.

Iterator 인터페이스에는 hasNext(), next() 메서드가 있음.

Iterator<E> 인터페이스를 구현하는 대표적인 클래스는 ArrayList(List 인터페이스), ArrayBlockingQueue(BlockingQueue 인터페이스), HashSet(Set 인터페이스),

LinkedList, Stack, TreeSet, Vector 등이 있음.

        

    

NOTE_ Map 객체에 관한 Observable class의 from() 함수는 없을까?

Map 인터페이스는 배열도 아니고 Iterable<E> 인터페이스를 구현하지 않았으므로 from() 계열 함수는 존재하지 않습니다. key-value 로 구성되어있어 keySet() 함수의 

순서를 정하면 만들 수 있을 것 같다는 생각...



2.1.6 fromCallable() 함수

RxJava는 비동기 프로그래밍을 하기 위한 라이브러리입니다. 먼저 살펴볼 인터페이스는, 동시성 API인 Callable 인터페이스로 비동기 실행 후 결과를 반환하는 cal() 메서드를 정의.

기존에 사용하던 callable 객체가 있다면 Observable.fromCallable() 을 활용하여 바로 RxJava와 연동합니다.



2.1.7 fromFuture() 함수

Future 인터페이스 역시 동시성 API로 비동기 계산의 결과를 구할 때 사용



2.1.8 fromPublisher() 함수

Observable에서 제공하는 fromXXX() 계열 함수의 마지막. 자바 9의 표준인 Flow API의 일부. 



2.2 Single class

Observable class는 데이터를 무한하게 발행할 수 있지만, Single class는 오직 1개의 데이터만 발행하도록 한정.

결과가 유일한 서버 API를 호출할 때 유용하게 사용할 수 있습니다.

중요한 것은 데이터 하나가 발행과 동시에 종료(onSuccess) 된다는 점(onNext()와 onComplete() 함수가 onSuccess()로 통합된 것).

따라서, Single class의 life cycle 함수는 onSuccess(T value) 함수와 onError() 함수로 구성됩니다.



2.2.1 just() 함수

가장 간단한 방법은 Observable 처럼 정적 팩토리 함수 just()를 호출하는 것.


        

2.2.2 Observable에서 Single class 사용

Single은 Observable의 특수한 형태이므로 Observable에서 변환할 수 있음.



2.2.3 Single class의 올바른 사용 방법

just()함수에 여러 개의 값을 넣으면 에러 발생



NOTE_ 함수인가? 메서드인가?

리액티브 프로그래밍은 함수형 프로그래밍 기법을 활용하므로 용어를 혼용해서 사용 가능.



2.3 Maybe 클래스

Observable의 또 다른 특수 형태로서, Single class와 마찬가지로 최대 데이터 하나를 가질 수 있지만, 

데이터 발행 없이 바로 데이터 발생(Single class 는 1개 완료, Maybe class는 0 or 1개 완료)을 완료할 수도 있음.

즉, Maybe class는 Single class에 onComplete 이벤트가 추가된 형태


Maybe 객체를 생성할 수 있는 리액티브 연산자에는 elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등이 있음



2.4 뜨거운 Observable

Observable은 차가운과 뜨거운이 있습니다. 

차가운 Observable은 "냉동식품"과 같습니다. 

Observable을 선언하고 just(), fromIterable() 함수를 호출해도 옵서버가 subscribe() 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않습니다. 다른 말로 게이른 접근법 입니다.


뜨거운 Observable은 구독자가 존재 여부와 관계없이 데이터를 발행함. 따라서 여러 구독자를 고려할 수 있습니다. 

단, 구독자로서는 Observable에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없습니다.


즉, 차가운 Observable은 구독자가 구독하면 준비된 데이터를 처음부터 발행하지만, 뜨거운 Observable은 구독한 시점부터 Observable에서 발행한 값을 받습니다.

        

차가운 Observable의 예는 웹 요청, db 쿼리와 파일 읽기 등입니다. 보통 내가 원하는 URL 이나 데이터를 지정하면 그때부터 서버나 db 서버에 요청을 보내고 결과를 받아옴

뜨거운 Observable의 예는 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격 등이 있음. 



NOTE_ 구독자가 여러 명이라는 것은 무슨 의미?

RxJava의 "구독자가 여러 명이다."의 의미를 예로 들어 설명하자면, 요청한 결과로 반환된 JSON 문서를 파싱해 원하는 속성을 추출한다고 해보자. 

날씨 정보, 지역 정보, 시간 정보를 반환하는 경우 RxJava에서는 위의 세가지 정보를 구독자라 생각하면 이해가 갈 것이다.

데이터의 원천은 한 곳이지만 내가 최종적으로 원하는 결과 데이터가 여러 종류일 때는 각각을 구독자로 생각하면 좋다.



뜨거운 Observable에서는 "배압(back pressure)"을 고려해야 한다. 배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생합니다.

차가운 Observable을 뜨거운 Observable 객체로 변환하는 방법은 Subject 객체를 만들거나 ConnectableObservable 클래스를 활용하는 것이다.


    

2.5 Subject 클래스

특성 : Observable의 속성과 구독자의 속성이 모두 있다는 점.(차가운 Observable를 뜨거운 Observable로 바꿀 수 있음)

Observable처럼 데이터를 발행할 수도 있고 구독자처럼 발행된 데이터를 바로 처리할 수도 있습니다.

제공하는 주요 클래스는 AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있다.



2,5,1 AsyncSubject class

Observable에서 발행된 마지막 데이터를 얻어올 수 있는 Subject class. 완료되기 전 마지막 데이터에만 관심이 있으며, 이전 데이터는 무시.

처리 흐름

1. 처음 구독자가 subscribe() 함수를 호출합니다.

2. 이후에 '1'원, '3'원이 발행된 후 두 번째 구독자가 subscribe() 함수를 호출합니다.

3. 마지막으로 '5'원이 발행되고 데이터 발행을 완료(onComplete 이벤트)합니다.



NOTE_ 마블 다이어그램으로 RxJava 이해하기

RxJava를 이해하는 데 가장 중요한 것을 '시간'이라고 생각하고 책을 보면서 마블 다이어그램을 살펴본 후 예제 코드를 살펴보는 과정을 갖자.



2.5.2 BehaviorSubject class

구독자가 구독을 하면 가장 최근 값 혹은 기본 값을 넘겨주는 클래스. 



2.5.3 PublishSubject class

구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작. 오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달받습니다.

별도의 기본값을 제공하지 않으므로 AsyncSubject class처럼 create() 함수를 호출해 생성합니다.



2.5.4 ReplaySubject class

가장 특이하고 사용 시 주의해야하는 class.

Subject class의 목적은 뜨거운 Observable을 활용하는 것인데 차가운 Observable처럼 동작하기 때문.

구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해줍니다. 마치 테이프로 전체 내용을 녹음해두었다가 새로운 사람이 들어오면 정해진 음악을 처음부터 들려주는 것과 같음.

그러므로, 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 합니다.



NOTE_ 데이터 발행자와 수신자

데이터 발행자(data source) : Observable, Single, Maybe, Subject, Completable

데이터 수신자(data receiver) : 구독자(Subscriber), 옵서버(Observer), 소비자(Consumer)

데이터 발행자는 클래스 개념이므로 명확하게 구분해야 함. 데이터 수신자는 몇 가지 용어를 번갈아 가면서 사용하며 개념이 명확하게 구분되지 않음. 

구독자 : RxJava에서 Observable과 연결할 때는 subscribe() 함수를 호출. 이 과정이 구독이므로 구독자가 됩니다.

옵서버 : RxJava는 옵서버 패턴을 구현. 데이터 발생자는 Observable이 되고 데이터 수신자를 옵서버라고 할 수 있음

소비자 : RxJava 1.x에서는 subscribe() 함수를 호출할 때 Subscriber 클래스를 인자로 넘겼지만, RxJava 2에서는 모두 함수형 인터페이스인 Consumer를 인자로 넘김.



2.6 ConnectableObservable class

이번 장에서는 주어진 데이터를 발행하려고 just()나 fromArray() 함수를 사용할 때도 있었고 PublishSubject, AsyncSubject class처럼 Observable이면서 옵서버도 되는 Subject class도 살펴보았음

정의 : Subject class처럼 차가운 Observable을 뜨거운 Observable로 변환하며, Observable를 여러 구독자에게 공유할 수 있으므로 원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용

특이점 : subscribe()함수를 호출해도 아무 동작이 일어나지 않는다는 것 -> 새로 추가된 connect()함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행하기 떄문

객체 생성 방법 : Observable에 publish() 함수를 호출(여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예하는 역할)

즉, connect()함수를 호출해야 그때까지 구독했던 구독자 모두에게 데이터를 발행



2.7 마치며

Observable의 생성은 just(), fromArray() 등의 팩토리 함수를 이용

Observable class는 데이터를 발행하는 팩토리 함수뿐만 아니라 그것을 처리할 수 있는 연산자도 함께 제공 => 팩토리 함수와 연산자를 함께 사용하는 기법을 "메서드 체이닝(method chaining)"이라 함





2장까지 마쳤습니다.







해당 예제 코드들은 


https://github.com/JC-Choo/RxJava


여기를 참고하세요!!! 저도 그렇지만, 읽어보는게 다가 아닙니다. 읽고 써보고 익숙해져야 합니다.


모두 힘내서 RxJava !!! 공부합시다!!!(추운데 몸조심하세요 저처럼 감기걸리면 몸이 힘들어요...)

반응형