RxJava w Androidzie – podstawy. Cz. 1

android-128W niniejszym tutorialu chciałbym przedstawić podstawy RxJavy w systemie Android. Zaczniemy od podstawowych pojęć związanych z programowaniem reaktywnym oraz przejdziemy od razu do ich praktycznego zastosowania. Zobaczymy, że wcale nie jest to takie trudne jak się wydaje.

1. Czym jest programowanie reaktywne

Programowanie reaktywne (reactive programming) jest paradygmatem programowania deklaratywnego skupionego na strumieniach danych i propagacji zmian. Dzięki niemu możliwe jest łatwe wyrażenie statycznych (np. tablice) jak i dynamicznych (np. emitery zdarzeń) strumieni danych, a także skomunikowanie ich z zależnym od nich kodem wykonalnym, co ułatwia automatyczna propagacja przepływu zmienionych danych. Tyle dość enigmatycznie na ten temat wikipedia. W praktyce oznacza to, że kiedy otrzymujemy jakieś dane, lub następuje ich zmiana, nasz kod może zareagować na takie zdarzenie i wykonać czy to niezbędne na tych danych operacje, czy po prostu np. zapisać je lub wyświetlić. Reactive Exctensions to zbiór rozszerzeń różnych języków programowania o powyższy paradygmat.

RxJava to po prostu implementacja Reactive Extensions w języku Java. RxAndroid natomiast, to biblioteka dedykowana dla systemu Android wykorzystująca RxJavę w tym środowisku do jego specyficznych zastosowań.

2. Wzorzec projektowy Obserwator (Observer)

Rx opiera się na idei wzorca Obserwator. We wzorcu tym mamy dwie podstawowe klasy obiektów:

  • emiter danych – obiekt który jest źródłem danych, dziedziczący po abstrakcyjnej klasieObservable, implementującą z interface’u ObservableSource metodę subscribe())
  • obserwator – obiekt implementujący interface Observer, który konsumuje dane emitowane przez obiekt obserwowany

Ideą wzorca jest możliwość „zapisanania się” (subscribe) przez Obserwatora na dane spływające od Oberwowanego.

Zobaczmy jak to wygląda w praktyce:

  • naszymi danymi będzie po prostu ciąg tekstowy „Witaj w RxAndroid!
  • deklarujemy Obserwowanego (Observable) którego emitowanym typem danych jest String
  • operatorem just() tworzymy instancję obiektu na podstawie danych
  • tworzymy obiekt Obserwatora (Observer), który zareaguje na otrzymane dane
  • zapisujemy się Obserwatorem u Obserwowanego na dane
  • kiedy Obserwator otrzyma dane, wyświetli je na ekranie aplikacji

app/build.gradle

// importujemy najnowsze biblioteki z https://github.com/ReactiveX/RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'

activity_main.xml

<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout
     xmlns:android="http://schemas.android.com/apk/res/android"
     android:layout_width="match_parent"
     android:layout_height="match_parent">

     <TextView
           android:id="@+id/hello_text"
           android:layout_width="wrap_content"
           android:layout_height="wrap_content"
           android:layout_centerInParent="true" />
</RelativeLayout>

MainActivity.java

// ...
import io.reactivex.Observable;
import io.reactivex.Observer;
// ...

public class MainActivity extends AppCompatActivity {

    private final static String TAG="RxTutorial1";

    private Observable<String> emiter;
    private Observer<String> observer;

    private TextView textView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        textView = findViewById(R.id.hello_text);

        // tworzymy obserwowany obiekt
        emiter = Observable.just("Witaj w RxAndroid!");

        // tworzymy obserwatora
        observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext");
                textView.setText(s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        };

        // zapisujemy się u obserwowanego na dane
        emiter.subscribe(observer);
    }
}

Klika słów wyjaśnienia:

  • emiter i obserwator mają ten sam typ danych (String)
  • tworzony obiekt klasy Observer musi nadpisać cztery metody:
    • onSubscribe – wywoływana w momencie zapisania się na dane
    • onNext – kiedy obserwator otrzyma kolejną porcję danych
    • onError – gdy coś pójdzie nie tak…
    • onComplete – gdy wszystkie dane zostały odebrane

Gdy obserwator zapisze się do emitera, otrzyma natychmiast dane. W metodzie onNext wyświetli je w kontrolce z layoutu.

Mamy oto działający „Hello World” w Rx’ie 🙂 Nie jest to może zbyt praktyczny przykład, chociaż… a gdyby tak chcieć np. opóźnić wyświetlenie tego napisu o 3 sekundy? Proszę bardzo, od tego jest operator delay():

// ...
        emiter
            .delay(3, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
//...

3. Schedulery i wątki

W powyższym przykładzie doszły jeszcze dwie linijki kodu – wywołania subscribeOn oraz observeOn. Wymaga to nieco dłuższego omówienia.

Aby móc równolegle wykonywać w aplikacji różne zadania, potrzebujemy wykorzystać do tego dostępną w systemie Android wielowątkowość. RxJava znacznie nam to ułatwia używając do tego tzw. schedulerów (będę używał tego anglicyzmu, bo polskie tłumaczenie ‚program planujący’ czy ‚rozpiska’ nie za dobrze jakoś wygląda). Scheduler to obiekt zarządzający pulą wątków. Kiedy scheduler potrzebuje wykonać jakieś zadanie, pobiera wątek ze swojej puli i wykonuje je w tym wątku. RxJava udostępnia nam schedulery różnych typów, m.in.:

  • Schedulers.io() – może mieć nieograniczoną pulę wątków, używany jest do zadań nie wymagających intensywnego użycia CPU takich jak interakcje z bazą danych, komunikacja sieciowa czy operacje w systemie plików.
  • Schedulers.computation() – używany do zadań wymagających obliczeń znacznie obciążających CPU, jego pula wątków jest równa liczbie dostępnych dla JVM rdzeni procesorów.
  • Schedulers.newThread() – standardowy, pojedynczy nowy wątek dla zadania wymagającego sekwencyjnego wykonania instrukcji.

Dzięki bibliotece RxAndroid mamy też do dyspozycji wątek specyficzny dla tego systemu:

  • AndroidSchedulers.mainThread() – główny wątek aplikacji gdzie następują interakcje z użytkownikem dzięki UI (User Interface).

Popatrzmy raz jeszcze na ostatni przykład. Kiedy polecamy emiterowi opóźnienie wysłania danych o 3 sekundy, metodą subscibeOn określamy z której puli wątków będzie on korzystał aby się wykonać. W naszym przypadku tworzymy sobie pojedynczy nowy wątek newThread. Następnie używamy metody observeOn, która mówi emiterowi, na którym wątku obserwator będzie nasłuchiwał danych. Użyliśmy tu głównego wątku aplikacji.

Jednym z najczęstszych przypadków z którym się spotkamy pisząc realną aplikację, jest pobieranie danych przez sieć z API. Jak dobrze wiemy, w Androidzie nie możemy wykonać pobrania danych z sieci w głównym wątku, ponieważ zostanie rzucony wyjątek NetworkOnMainThreadException. Musimy do tego celu użyć np. zadania asynchronicznego (AsyncTask). Dzięki Rx’owi mamy to znacznie ułatwione – deklarujemy po prostu, że dane pobierane zostaną w puli wątków IO (Input/Output), a odebrane w głównym wątku UI. Skorzystamy z tej wiedzy w dalszych częściach tutoriala.

4. Disposable – dbamy o pamięć

W realnej aplikacji gdzie często pobieramy dane z API by wyświetlić je w jakieś postaci na ekranie, może nastąpić taka sytuacja: zanim dane zostaną w całości pobrane, użytkownik zdecyduje się np. cofnąć do poprzedniego ekranu (aktywności). Oczywiście aktywność zostanie zniszczona ale subskrypcja obserwatora na zadanie w osobnym wątku cały czas będzie istnieć. Gdy dane przyjdą, obserwator nie będzie miał już widoku do zaktualizowania. Będziemy mieć wyciek pamięci, aplikacja zawiesi się lub zakończy się spektakularnym crashem.

By uchronić się przed tego typu sytuacjami, używamy obiektu Disposable, który pozwala odwołać subskrypcję w przypadku kiedy obserwator nie che już nasłuchiwać dłużej danych z emitera.

Przypatrzmy się metodzie onSubcibe() tworzonego obserwatora. Obiekt typu Disposable jest do niej przekazywany, kiedy obserwator zapisze się na dane. Przypiszmy go sobie do pola klasy naszej aktywności. Teraz kiedy aktywność będzie niszczona i zostanie wywoływana jej metoda onDestroy(), będziemy mogli odwołać subskrypcję i problem z głowy:

public class MainActivity extends AppCompatActivity {
    // ...
    private Disposable disposable;
    // ...

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        // ...
        observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe");
                disposable = d; 
            }
       // ...
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

Dane nie zostaną wysłane z emitera jeśli nie będzie istniał jakikolwiek obserwator na te dane zapisany.

Powyższy kod można jeszcze uprościć. Z pomocą przychodzi nam klasa DisposableObserver, która implementuje jednocześnie interfejs Disposable oraz Observer. Użyjmy więc tej klasy tam gdzie definiujemy naszego obserwatora:

        observer = new DisposableObserver<String>() {
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e);
            }
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        };

Jak widać, nie musimy nadpisywać metody onSubcribe. Nie musimy też zapamiętywać disposable’a w prywatnym polu naszej aktywności. Teraz w metodzie onDestroy wystarczy wywołać:

    @Override
    protected void onDestroy() {
        super.onDestroy();
        observer.dispose();
    }

Jeszcze jedna rzecz. Jeśli zdefiniujemy kilku obserwatorów, musielibyśmy w onDestroy wywoływać dispose na każdym z nich. Możemy jednak wszystkich zgrupować w obiekcie CompositeDisposable:

// dodajemy pole w aktywności
private CompositeDisposable compositeDisposable = new CompositeDisposable();

// ...
// dodajemy obserwatorów
    compositeDisposable.add(observer1);
    compositeDisposable.add(observer2);
// ...

Teraz możemy wyczyścić (wywołać na każdym dispose) wszystkich dodanych do tego kontenera obserwatorów:

    @Override
    protected void onDestroy() {
        super.onDestroy();
        compositeDisposable.clear();
    }

Na ten moment to wszystko. Umiemy tworzyć emitery danych (Observable) oraz obserwatorów (Observer) potrafiących te dane odebrać. Umiemy też posprzątać po sobie, jeśli aktywność lub fragment w którym działają obserwatorzy, zostanie zniszczona. W kolejnej części przejdziemy do nieco bardziej zaawansowanych przykładów, gdzie emitowanymi danymi będą kolekcje (listy) obiektów, a obróbką tych danych zajmą się takie operatory jak map() czy flatMap(). W końcu będziemy mogli łatwo odebrać i przetworzyć dane z realnego API dzięki bibliotece Retrofit.

Podziel się:
Facebooktwitterredditpinterestlinkedintumblrmail

1 thought on “RxJava w Androidzie – podstawy. Cz. 1

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *