RXJAVA LÀ GÌ

RxJava, RxAndroid cơ bản.Bạn đang xem: Rxjava là gì

I. Reactive Programming là gì?

Reactive Programing mà một phương pháp lập trình tập trung vào các luồng dữ liệu không đồng bộ và quan sát sự thay đổi của các luồng dữ liệu không đồng bộ đó, khi có sự thay đổi sẽ có hành động xử lý phù hợp. Vì đây là luồng dữ liệu không đồng bộ nên các module code cùng lúc chạy trên các thread khác nhau từ đó rút ngắn thời gian thực thi mà không làm block main thread.

Bạn đang xem: Rxjava là gì

Bạn đang xem: Rxjava là gìBạn đang xem: Rxjava là gì

II. RxJava

RxJava cơ bản là một thư viện cung cấp các sự kiện không đồng bộ được phát triển theo Observer Pattern. Bạn có thể tạo luồng dữ liệu không đồng bộ trên bất kỳ thread nào, thay đổi dữ liệu và sử dụng dữ liệu bằng Observer. Thư viện RxJava cung cấp nhiều loại Operator tuyệt vời như map, combine, merge , filter và nhiều thứ khác có thể được áp dụng cho luồng dữ liệu.

III. RxAndroid

RxAndroid được đặc biệt sử dụng cho nền tảng Android được phát triển dựa trên RxJava. Đặc biệt Schedulers được bổ sung cho RxAndroid nhằm hỗ trợ cho đa luồng trong ứng dụng Android. Schedulers sẽ giúp bạn phân chia luồng chạy cho từng module code sao cho phù hợp. Một vài luồng chạy phổ biến được sử dụng qua Schedulers.

AndroidSchedulers.mainThread () Cung cấp quyền truy cập vào Android Main Thread / UI Thread.Schedulers.newThread () Thread mới sẽ được tạo ra mỗi khi một nhiệm vụ được tạo.

IV. Những thành phần quan trọng trong RxJava:

Về cơ bản RxJava có hai thành phần chính: Observable và Observer. Thêm vào đó, có những thứ khác như Schedulers, Operators và Subscription là các thành phần đóng vai trò như đa luồng, thao tác dữ liệu, và kết nối. Chúng ta sẽ cùng làm quen với từng thành phần: Observable: Là luồng dữ liệu thực hiện một số công việc và phát ra dữ liệu.Observer : Là thành phần đi kèm không thể thiếu của Observable. Nó nhận dữ liệu được phát ra bởi Observable. Subcription: Là mối liên kết giữa Observable và Observer. Có thể có nhiều Observer đăng ký một Observable duy nhất. Operator: Hỗ trợ cho việc sửa đổi dữ liệu được phát ra bởi Observable trước khi observer nhận chúng. Schedulers: Scheduler quyết định thread mà Observable sẽ phát ra dữ liệu và trên thread nào Observer sẽ nhận dữ liệu.

Xem thêm: Gia Thế "Khủng" Của Chồng Hoa Hậu Jennifer Phạm Lần Đầu Khoe Con Gái Thứ Tư

1. Cách tạo Observable

*

Đầu tiên chúng ta sẽ điểm qua một vài phương pháp phổ biến để tạo ra Observable:

2. Cách tạo Observer

Đối với mỗi loại Observer khác nhau chúng ta có cách tạo và thực thi khác nhau nhưng đều khá đơn giản. Đây là ví dụ điển hình nhất để tạo ra Observer:

3. Tạo Observer theo dõi Observable

Đây là các phương thức cơ bản để khiến cho Observer đăng ký theo dõi Observable.

animalsObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(animalsObserver);subscribeOn(Schedulers.io ()): Báo cho Observable chạy nhiệm vụ trên một chuỗi nền.observOn(AndroidSchedulers.mainThread ()): Yêu cầu Observer nhận dữ liệu trên luồng chính để bạn có thể thực hiện các hành động liên quan đến giao diện.

4. Disposable

Disposable được sử dụng để hủy sự kết nối của Subserver với Subsevable khi không còn cần thiết việc này rất hữu dụng để tránh việc rò rỉ bộ nhớ. Khi Observer kết nối được với Observable trong onSubcribe() ta sẽ nhận được Disposable. Để hủy sự kết nối trong onDestroy() của Activity bạn nên gọi hàm dispose() của Disposable.

5. Operator

RxJava cung cấp tập hợp lớn các operator hỗ trợ cho việc thao tác với dữ liệu vậy nên operators được phân chia dựa trên loại công việc chúng làm. Ví dụ như nhóm tạo Observable: create, just, fromArray,... Nhóm lọc dữ liệu: filter, skip, last, take, ... Nhóm tạo Observable từ dữ iệu của Observable khác như: buffer, map, flatmap,...Lưu ý khi sử dụng nhiều Operator thì kết quả của Operator trước sẽ truyền cho Operator sau.Bạn có thể tìm hiểu thêm tại đây

V. Ví dụ:

Sau đây là ví dụ cụ thể cho từng loại Observable được đề cập phía trên:Trong các ví dụ mình sử dung Custom object Note:

public class Note { int id; String note; // getters an setters}

1. Observable & Observer:

Được sử dụng nhiều nhất trong số tất cả. Observable có thể phát ra không hoặc nhiều phần tử.

public class ObserverActivity extends AppCompatActivity { private static final String TAG = ObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple Observable emitting multiple Notes * - * Observable : Observer */ Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_observer); Observable notesObservable = getNotesObservable(); Observer notesObserver = getNotesObserver(); notesObservable.observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribeWith(notesObserver); } private Observer getNotesObserver() { return new Observer() { Override public void onComplete() { Log.d(TAG, "onComplete"); } }; } private Observable getNotesObservable() { final List notes = prepareNotes(); return Observable.create(new ObservableOnSubscribe() { Override public void subscribe(ObservableEmitter emitter) throws Exception { for (Note note : notes) { if (!emitter.isDisposed()) { emitter.onNext(note); } } // all notes are emitted if (!emitter.isDisposed()) { emitter.onComplete(); } } }); } private List prepareNotes() { List notes = new ArrayList(); notes.add(new Note(1, "Buy tooth paste!")); notes.add(new Note(2, "Call brother!")); notes.add(new Note(3, "Watch Narcos tonight!")); notes.add(new Note(4, "Pay power bill!")); return notes; } Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output:

public class SingleObserverActivity extends AppCompatActivity { private static final String TAG = SingleObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Single Observable emitting single Note * Single Observable is more useful in making network calls * where you expect a single response object to be emitted * - * Single : SingleObserver */ // TODO - link to Retrofit tutorial Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_single_observer); Single noteObservable = getNoteObservable(); SingleObserver singleObserver = getSingleObserver(); noteObservable .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(singleObserver); } private SingleObserver getSingleObserver() { return new SingleObserver() { Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.getMessage()); } }; } private Single getNoteObservable() { return Single.create(new SingleOnSubscribe() { Override public void subscribe(SingleEmitter emitter) throws Exception { Note note = new Note(1, "Buy milk!"); emitter.onSuccess(note); } }); } Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output

public class CompletableObserverActivity extends AppCompatActivity { private static final String TAG = CompletableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Completable won"t emit any item, instead it returns * Success or failure state * Consider an example of making a PUT request to server to update * something where you are not expecting any response but the * success status * - * Completable : CompletableObserver */ // TODO - link to Retrofit tutorial Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_completable_observer); Note note = new Note(1, "Home work!"); Completable completableObservable = updateNote(note); CompletableObserver completableObserver = completableObserver(); completableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(completableObserver); } /** * Assume this making PUT request to server to update the Note */ private Completable updateNote(Note note) { return Completable.create(new CompletableOnSubscribe() { Override public void subscribe(CompletableEmitter emitter) throws Exception { if (!emitter.isDisposed()) { Thread.sleep(1000); emitter.onComplete(); } } }); } private CompletableObserver completableObserver() { return new CompletableObserver() { Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output

onSubscribeonComplete: Note updated successfully!

5. Flowable & Observer

Được sử dụng khi một Observable tạo ra số lượng lớn các sự kiện / dữ liệu mà Observer có thể xử lý. Flowable có thể được sử dụng khi nguồn tạo ra 10k+ sự kiện và Onserver không thể tiêu thụ tất cả.Flowable sử dụng phương pháp Backpressure để xử lý dữ liệu tránh lỗi MissingBackpressureException và OutOfMemoryError.

public class FlowableObserverActivity extends AppCompatActivity { private static final String TAG = FlowableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple example of Flowable just to show the syntax * the use of Flowable is best explained when used with BackPressure * Read the below link to know the best use cases to use Flowable operator * https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#when-to-use-flowable * - * Flowable : SingleObserver */ Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_flowable_observer); Flowable flowableObservable = getFlowableObservable(); SingleObserver observer = getFlowableObserver(); flowableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .reduce(0, new BiFunction() { Override public Integer apply(Integer result, Integer number) { //Log.e(TAG, "Result: " + result + ", new number: " + number); return result + number; } }) .subscribe(observer); } private SingleObserver getFlowableObserver() { return new SingleObserver() { Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e.getMessage()); } }; } private Flowable getFlowableObservable() { return Flowable.range(1, 100); } Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output