iOS · reactive programming · ReactiveX

Sử dụng ReactiveX trong Swift – Phần 2

Trong phần trước, mình đã giới thiệu về Functional Reactive Programming, Stream, cũng như một số lợi ích của nó mang lại cho việc phát triển ứng dụng đòi hỏi nhiều tương tác với người dùng. Trong phần này chúng ta sẽ tìm hiểu sâu hơn một số khái niệm trong Functional Reactive Programming, đồng thời thực hiện một ứng dụng sử dụng ReactiveX.

Reactive Programming
Functional Programming
Observable, Operations
Sử dụng ReactiveX

1 – Tại sao là Functional Reactive Programming.

Functional Reactive Programming là sự kết hợp giữa Reactive Programming và Functional Programming.

1.1 – Reactive Programming

Theo Wikipedia, Reactive Programming là mô hình lập trình hướng tới các luồng dữ liệu và sự phản hồi thay đổi. Nói cách khác, khi nào có sự thay đổi, các bên sẽ chủ động phản hồi cho nhau thông qua các luồng dữ liệu (Stream). Ví dụ trong mô hình MVVM, khi View Model có sự thay đổi, View Model sẽ chủ động phản hồi để view cập nhật lại trạng thái và ngược lại, quá trình này có thể lập lại nhiều lần.

Mvvm Reactive

(Hình: Andre Saltz)

Trong thế giới lập trình, chúng ta hiện thực điều này bằng cách tạo ra hai đối tượng: Observable (có thể quan sát) và Observer (quan sát),  kết nối với nhau thông qua một Stream như đã đề cập trong bài trước. Như vậy Stream đóng vai trò hạt nhân trong Reactive Programming, tất cả mọi tác nhân đều có thể là Stream.

Everything is strea

(Hình: Andre Saltz)

Nhấn nút: --tap--tap--tap--tap--> Stream
Gõ bàn phím: --r--e--a--c--t--i--v--e--> Stream
Kết quả trả về API: --Api response--> Stream
Tập dữ liệu bất kỳ: --1--2--3--> Stream
Tín hiệu mạng: --true--false--true--> Stream

1.2 – Functional Programming

FP là mô hình lâu đời nhất trong bộ ba mô hình lập trình: RF, Procedure và OOP. FP xem chương trình là một tập hợp các hàm nhận vào đối số và trả về giá trị. Trong bài này mình sẽ không đi sâu vào FP, chỉ tập trung vào một vài điểm mạnh chính của FP, khiến nó trở thành công cụ hữu hiệu trong RP.

1.2.1 – Pure Function:  Là một hàm mà kết quả trả về chỉ phụ thuộc vào tham số truyền vào, trong quá trình thực thi không thay đổi state của đối tượng bên ngoài. Ví dụ:

func length(x: String) { // Pure function
    return x.length
}

func length(x: String) { // Not pure function
    globalVar = x.length 
    return globalVar
}

Pure function giúp cho việc xử lý song song dễ dàng hơn, tránh được bottleneck tại thao tác thay đổi state, vì khi nhiều threads cần thay đổi chung một state sẽ dẫn tới đụng độ, chúng ta cần lock state và thay đổi tuần tự. Việc thay đổi state được gọi là Side-Effects, cần được xử lý riêng biệt. Ví dụ trong ReactiveX, cung cấp các hàm doOn (next, error, completed), để xử lý side effect riêng biệt. Vì lý do này, khi sử dụng ReactiveX, chúng ta cần lưu ý tách biệt side effect ra khỏi khác hàm chuyển đổi (map, flatMap, reduce, …)

Ngoài ra, pure function cũng không thay đổi tham số truyền vào.

1.2.2 – High Order Function: Một hàm có thể dùng làm tham số đầu vào cho một hàm khác, hoặc kết quả trả về là một hàm. Tính chất này, giúp RP có thể định nghĩa một tác vụ mà không thực thi nó tại thời điểm định nghĩa. Thông qua nhiều lớp (nested callback), chúng ta có một chuỗi các tác vụ, chẳng hạn: o = f(g(x)). Tác vụ chỉ thực hiện khi nó được subscribe tới.

1.2.3 – Composition (Map, FlatMap, Reduce, …): Cho phép chuyển đổi từ Stream này qua một Stream khác không làm thay đổi trạng thái của Stream ban đầu.

composition-of-function-image

Ví dụ nhân đôi tất cả các giá trị trong mảng, thay vì sử dụng vòng lặp nhân đôi từng giá trị của mảng, chúng ta cung cấp cho trình biên dịch một hàm cho phép chuyển đổi các phần tử f(x) = x * 2. Kết quả trả về là một mảng mới, mảng ban đầu không thay đổi giá trị.

[1, 2, 3].map { $0 * 2 }

2. Observable – Nơi khởi nguồn stream

Như mình vừa đề cập ở trên Observer lắng nghe tới Observable, và nhận được kết quả phát ra từ Observable.

Chúng ta đã quen thuộc với việc thực thi một tác vụ, chờ và nhận kết quả của tác vụ sau khi tác vụ kết thúc. Nhưng với ReactiveX, thay vì thực thi tác vụ ngay thời điểm nó được gọi, chúng ta tạo ra một tác vụ cho phép các tác vụ khác lắng nghe và nhận kết quả sau đó. Như vậy, Observable thường sử dụng cho các tác vụ tốn thời gian, đòi hỏi bất đồng bộ.

func getData() -> Observable<[Profile]>{
    Observable.create { observer in
        API.getUsers() { user in
            // async
            observer.onNext(users)
            observer.onCompleted()
        }
        return Disposables.creates {
           API.cancel ()
        }
    }
}

let o = getData()

Sau lệnh gọi hàm getData(),  Observable mới chỉ được khởi tạo, các lệnh gọi API chưa được thực thi. Nó chỉ được thực thi khi có lệnh o.subscribe(). Thực chất lúc này o chỉ là một biến hàm o = f(x), trong đó x là một đối tượng Observer. Khi có lệnh subscribe, f(x) sẽ được gọi với tham số x chính là subscriber

2.1 – Khởi tạo Observable

ReactiveX cung cấp một số hàm để khởi tạo stream

create, deferred, empty, never, of (from), interval, just, range

Chúng ta sẽ tìm hiểu một số hàm thông dụng: create, deferred, of, just, interval.

2.1.1 – create: Khởi tạo một Observable

Observable.create { observer in
   // hiện thực logic (thường là asynchronous: API, database, ...)
   // phản hồi thông qua: onNext, onError, onCompleted 
   observer.onNext(data)
   observer.onComplete()
   return Disposables.create {
      // sẽ được thực thi khi có lỗi, hoặc kết thúc stream.
      // giải phóng bộ nhớ(n).
   }
}

onNext: phản hồi sự thay đổi cho subscriber, có thể gọi nhiều lần.

onError: phản hồi lỗi cho subscriber, stream sẽ kết thúc khi onError được gọi.

onCompleted: chỉ gọi 1 lần khi muốn kết thúc stream.

2.1.2 – deferred: deferred chỉ khởi tạo Observable khi được subscribe tới. Trong ví dụ sau, chúng ta sử dụng deferred, biến x sẽ được khởi tạo khi có subscribe, nên x sẽ có giá trị mới nhất.

let o: Observable = Observable.deffered {
   let x: String = loadInfo() 
   // mục tiêu: biến x có giá trị mới nhất khi tạo Observable
   return createObservable(x)
}

func createObservable(x: String) -> Observable {
    return Observable.create {
       // khởi tạo Observable.
    }
}

2.1.3 – of: khởi tạo stream từ danh sách đối tượng dữ liệu.

Observable.of(1, 2, 3) // tạo ra 3 events 1, 2, 3

2.1.4 – just: khởi tạo stream từ một đối tượng dữ liệu bất kỳ.

Observable.just([1, 2, 3]) // tạo ra 1 event với giá trị [1, 2, 3]

2.1.5 – interval: khởi tạo Observable phát ra tín hiệu theo chu kỳ

let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .background)
Observable.interval(60, scheduler: scheduler)

Lưu ý: Trong tất cả các hàm khởi tạo observable, khi subscribe bao nhiêu lần thì từng đó stream riêng biệt được tạo ra. Trong ví dụ sau, đoạn code “API.getUser()” sẽ được thực thi 2 lần và ở đây có 2 stream hoàn toàn riêng biệt. Nếu muốn 2 lệnh subscribe dùng chung 1 stream thì chúng ta phải sử dụng share.

let o: Observable = Observale.create { ob
    API.getUser() { user in
        ob.onNext(user) 
        ob.onCompleted()
    }
}

o.subscribe()
o.subscribe()

// chia sẻ 1 stream cho nhiều subscriber
let share = o.share()
share.subscribe()
share.subscribe()

3 – Operations – Phép thuật của ReactiveX

Trong phần này mình sẽ giới thiệu một số operations thông dụng dùng để chuyển đổi stream. Một lưu ý là khi sử dụng các tác vụ chuyển đổi từ stream cũ qua stream mới, thì stream cũ không bị thay đổi. Việc stream mới phát tín hiệu error hoặc onCompleted không làm kết thúc stream cũ, chỉ ảnh hưởng tới các stream tiếp theo.

3.1 – Transformation: map, flatMap, flatMapFirst, flatMapLatest, …

3.1.1 – map: chuyển đổi mỗi phần tử thành một phần tử mới, stream gốc có bao nhiêu phần tử thì stream mới có chừng đó phần tử.

func getUsers() -> Observable<[Profile]>
// giả sủ chúng ta muốn lấy danh sách userId
getUsers().map { $0.userId }

3.1.2 – flatMap: thường sử dụng khi muốn thực hiện một chuỗi Observable tuần tự. flatMap chuyển đổi mỗi phần tử của một Observable gốc thành một Observable mới, sau đó kết hợp tất cả các Observable mới thành một Observable. Có thể hiểu như sau: Mỗi tín hiệu phát ra từ Observable gốc vừa là tham số đầu vào, vừa là trigger của một Observable mới. Kết quả là một tập hợp các Observable, sau đó các Observable đó sẽ được kết hợp(merge) thành một Observable duy nhất.

Flat map

(Hình: reactivex.io)

// gọi API async, trả về token nếu login thành công 
func login(usn: String, pwd: String) -> Observable
// gọi API async, trả về thông tin người dùng
func getUserProfile(token: String) -> Observable

button.rx.tap.asObservable().login("usn", "pwd").flatMap { token in 
    getUserProfile(token)
}.subscribe(onNext: {
    // hiển thị profile
})

Ở ví dụ trên, khi user nhấn login button, hàm login sẽ được thực thi và trả về một token, nó sẽ được dùng để làm tham số đầu cho hàm getUserProfile. Nếu hàm login bị lỗi, thì hàm getUserProfile sẽ không được gọi.

Tại sao cần observable ở ví dụ trên?

Trước hết, chúng ta phân tích tại sao chúng ta cần Observable ở trong ví dụ trên: login và getUserProfile là hai hàm cần phải kết nối với API, việc này đòi hỏi phải mất một khoảng thời gian nhất định, nên chúng ta phải lập trình bất đồng bộ tránh việc lock UI, đồng thời cần một cơ chế để lắng kết quả và phản hồi cho người dùng. Như vậy, observable hoàn toàn phù hợp với các yêu cầu trên.

Sự khác biệt map và flatMap?

Tiếp theo, chúng ta tìm hiểu sự khác biệt giữa flapMap, map và khi nào sử dụng hai toán tử này.

Trong ví dụ ở phần map, sau khi thực thi hàm getUsers, ta nhận được danh sách người dùng và muốn chuyển đổi kết quả nhận được thành một kết quả khác, không thực thi tác vụ nào khác. Như vậy map dùng để chuyển đổi các đối tượng dữ liệu, từ đối tượng A thành B.

Trong ví dụ ở phần flatMap, login và getUserProfile là hai tácc vụ cần thực hiện tuần tự, subscriber chỉ quan tâm tới kết quả của tác vụ cuối cùng. Như vậy, flatMap dùng để kết nối các tác vụ cần thực hiện tuần tự. Các tác vụ trung gian có thể huỷ khi thực hiện xong, kết quả cuối cùng sẽ được trả về cho bên lắng nghe.

Sự khác biệt flatMap và callback?

Nếu không sử dụng flatMap và Rx chúng ta có thể viết như sau:

let LoginHandler = (Profile?, Error?) -> Void
login(usn, pwd, completion: LoginHandler) { (token, loginError) in
    if loginError != nil {
        completion(nil, loginError)
    }
    getUserProfile(token) { profile, error
        completion(profile, error)
    }
}

Mọi việc sẽ trở nên phức tạp hơn nếu có nhiều hơn các tác vụ cần thực hiện. Chúng ta sẽ thấy sự khác biệt của flatMap so với cách làm thông thường (callback) qua ví dụ sau: A nhờ B làm tác vụ, B chỉ làm được một phần, phần còn lại B nhờ C làm (tất cả các tác vụ cần thời gian để thực thi). Với callback, B phải đứng chờ, lắng nghe C, và phản hồi kết quả của C cho A. Ngược lại, khi sử dụng flatMap, sau khi B xong, đưa cho C làm và nói với C, sau khi hoàn thành thì trả lại kết quả cho A. Vì thế, có thể huỷ B ngay sau khi nó thực hiện xong.

3.1.2 – flatMapFirst: Trong ví dụ trên nếu chúng ta muốn khi hàm login đang được thực thi, nếu user nhấn button lần nữa thì bỏ qua, nếu như hàm login chưa được thực thi hoặc đã thực thi tại thời điểm user nhấn button, thì thực thi hàm login.

button.rx.tap.asObservable()
.flatMapFirst { _ in
    login("usn", "pwd")
}.flatMapFirst { token in 
    getUserProfile(token)
}.subscribe(onNext: {
    // hiển thị profile
})

Chú ý: Nếu ở lần flatMap thứ 2 nếu không sử dụng flapMapFirst chúng ta vẫn có thể nhận được 2 tín hiệu (nhắc lại: vì ở đây là 2 stream riêng biệt, nên được đối xử riêng biệt).

3.1.3 – flatMapLatest: khi một tín hiệu tới, nếu không có tín hiệu nào đang được xứ lý, tín hiệu đó sẽ được xử lý để chuyển đổi qua Observable. Nếu có một tín hiệu nào đang được xử lý, thì cancel quá trình xử lý tín hiệu cũ và xử lý tín hiệu mới. Về mặt hình thức thì flatMapFirst, flatMapLatest khá giống nhau, chúng ta ưu tiên sử dụng flatMapLatest khi muốn có dữ liệu mới nhất.

button.rx.tap.asObservable()
.flatMapLatest { _ in
    login("usn", "pwd")
}.flatMapLatest { token in 
    getUserProfile(token)
}.subscribe(onNext: {
    // hiển thị profile
})

3.2 – Filter

Nếu ai đã từng sử dụng LINQ thì phần này khá quen thuộc: skip, take, first, last, filter,..

func getUsers() -> Observable
getUsers().filter { $.name == "Peter" } 
getUsers().takeLast() // phần tử cuối cùng
getUsers().take(1) // lấy 1 phần tử đầu tiên
getUsers().skip(3).take(1) // bỏ qua 3 phần tử đầu, lấy phần tử thứ 4
// chỉ trả về các phần tử khác phần tử trước đó. Ví dụ: 1-2-1
getUser().map { $0.name }.distinctUntilChanged()

Lưu ý: đối với các hàm takeLast, take(n), takeUntil, kết quả trả về là 1 stream mới, khi lấy đủ số phần tử thì stream sẽ kết thúc.

3.4 – Combine: zip, combineLastest, merge

Khi muốn xử lý song song nhiều tác vụ chúng ta có thể sử dụng zip, combineLatest, merge

3.4.1 – combineLatest: kết hợp nhiều stream thành một stream, phần tử mới là sự kết hợp phần tử mới nhất của mỗi stream, và được phát ra khi một stream bất kỳ phát ra tín hiệu mới. Các observable không cần cùng kiểu dữ liệu. Tín hiệu đầu tiên được phát ra sau khi tất cả các observable phát ra ít nhất 1 tín hiệu.

Rx combineLatest

(Hình: rxmarvles)

let o1 = userName.rx.text.asObservable() // return Observable
let o2 = password.rx.text.asObservable() // return Observable
Observable.combineLatest(o1, o2) { (i1, i2) -> Bool in
    !i1.isEmpty && !i2.isEmpty
}.subscribe { valid in
    loginButton.enabled = valid
}.addDisposableTo(disposeBag)

Khi nào sử dụng combineLatest?

Khi chúng ta cần dữ liệu đầu của tất cả các tác vụ để thực hiện một tác vụ mới. Kể từ đó, khi có sự thay đổi ở bất kỳ tác vụ nào, thì sử dụng dữ liệu mới nhất để kích hoạt tác vụ mới. Ở ví dụ trên ta cần userName để quyết định ẩn/hiện loginButton. Khi userName hoặc password thay đổi thì kiểm tra ẩn/hiện.

3.4.2 – zip: kết hợp nhiều stream thành một stream, 1 phần tử của stream mới là sự kết hợp của các phần tử ở vị trí tương ứng của mỗi stream. Một phần tử mới chỉ được phát ra sau khi các phần tử ở vị trí tương ứng của tất cả các stream được phát ra. Các observable không cần cũng kiểu dữ liệu.

reactive zip

(Hình: rxmarbles.com)

let users = [user1, user2, user3,...]
let o1 = Observable.of(users).flatMap { getPhoto($0) }
let o2 = Observable.of(users).flatMap { getProfile($0) }
Observable.zip(o1, o2) { (i, p) -> (UIImage, Profile) in
    (i, p)
} // kết quả là Observable<[(Image, Profile)]>

Khi nào nên sử dụng zip?

Giả sử chúng ta có danh sách users, mỗi user chúng ta cần lấy hai thông tin từ API là: image và profile. Như vậy chúng ta cần lặp qua danh sách user, mỗi user chúng ta thực thi đồng thời hai hàm getProfile và getPhoto. Trong trường hợp này, có thể sử dụng zip như ví dụ trên.

Chú ý: khi sử dụng zip có thể gây ra vấn đề backpressure. Có nghĩa là một observable phát ra các tín hiệu thường xuyên hơn observable còn lại, các phần tử đó được giữ lại, dẫn tới bộ nhớ không được giải phóng.

3.4.3 – merge: kết hợp tất cả các phần tử của nhiều Observable thành 1 Observable, các phân tử được đan xen theo trình tự thời gian. Hai observable phải cũng kiểu dữ liệu

reactivex merge

(Hình: rxmarbles.com)

let observable1 = Observable.of(1, 2, 3)
let observable2 = Observable.of(4, 5, 6)
Observable.of(observable1, observable2).merge()

3.4.4 – concat: nối tiếp tất cả các phần tử của observable thứ 2 với các observable thứ nhất. Các tín hiệu của các observable không đan xen nhau theo thời gian. Hai observable phải cũng kiểu dữ liệu

reactivex- concat

(Hình: rxmarbles.com)

let observable1 = Observable.of(1, 2, 3)
let observable2 = Observable.of(4, 5, 6)
observable1.concat(observable2) // kết quả: 1, 2, 3, 4, 5, 6

Khác với zip, merge, combineLatest các observable được thực thi song song, concat thực thi tuần tự các observable. Nghĩa là khi observable1 kết thúc, thì observable2 mới được kích hoạt. Nếu observable1 có error thì observable2 không được thực thi. Cho nên, khi sử dụng concat thì observable1 bắt buộc phải có tín hiệu onCompleted, nếu không observable1 không được thực thi.

onError, onCompleted: Với tất cả các toán tử dùng để kết hợp stream ở trên, onError của stream kết hợp, được phát ra khi một trong số stream có error, đồng thời stream đó sẽ bị kết thúc. Còn onCompleted được phát ra khi tất cả các Stream phát ra tín hiệu onCompleted.

4 – Sử dụng ReactiveX tạo ứng dụng Timeline.

Trước hết, chúng ta cần thêm các thư viện ReactiveX vào project bằng cách sử dụng CocoaPods. Ngoài RxSwift và RxCocoa của ReactiveX, mình sử dụng thêm Alamofire để đơn giản hoá việc kết nối tới API, giúp chúng ta tập trung vào việc sử dụng ReactiveX: Social Timeline

pod 'RxSwift',    '3.0.0-beta.2'
pod 'RxCocoa',    '3.0.0-beta.2'
pod 'Alamofire', '~> 4.0'

Tiếp theo, chúng ta sẽ kết hợp ReactiveX và Alamofire tạo ra lớp BaseAPIService chứa các hàm thực hiện việc request tới API, chuyển đổi từ JSON sang các đối tượng dữ liệu (models) tương ứng.

import RxSwift
import RxCocoa
import Alamofire

enum ServiceError : Error {
    case InvalidJSON
}

typealias JSON = [String: AnyObject]
protocol JSONDecodable {
    init(dict: [String: AnyObject]) throws
}

class BaseAPIService {
 func rx_requestArray(url: String) -> Observable<[T]> {
     return rx_requestJSON(url: url).map { 
        try $0.flatMap { try T.init(dict: $0) } 
     }
 }

 func rx_requestObject(url: String) -> Observable {
    return rx_requestJSON(url: url).map { 
       try T.init(dict: $0.first ?? [:]) 
    }
 }

 func rx_requestJSON(url: String) -> Observable {
     return Observable.create { o in
        let request = Alamofire.request(url)
            .validate(statusCode: 200..<300)
            .validate(contentType: ["application/json"])
        request.responseJSON { data in
            guard let json = data.result.value as? [JSON] else { 
                o.onError(data.result.error ?? ServiceError.InvalidJSON)
                    o.onCompleted()
                    return
                }
                o.onNext(json)
                o.onCompleted()
            }
            return Disposables.create {
                request.cancel()
            }
        }
    }
}

Trước khi thực hiện các phần tiếp theo, chúng ta sẽ cùng phân tích rõ hơn vai trò và lợi ích của ReactiveX trong lớp BaseAPIService ở trên. Nếu như không sử dụng ReactiveX thì hàm BaseAPIService có thể được thực hiện như sau:

typealias JSONHandler = (_ data: [JSON]?, _ error: Error?) -> Void
typealias ArrayHandler = (_ data: [T]?, _ error: Error?) -> Void
typealias ObjectHandler = (_ data: T?, _ error: Error?) -> Void

func rx_requestJSON(url: String, completion: JSONHandler) -> Void {
    let request = Alamofire.request(url)
       .validate(statusCode: 200.. Void { 
    rx_requestJSON(url: url) { (data, error) in 
        do {
           let array = try data?.map { try T.init(dict: $0) }
           completion(array, error)
        }
        catch {
           completion(nil, ServiceError.InvalidJSON)
        } 
    } 
}

func rx_requestObject(url: String, completion: ObjectHandler) -> Void { 
    rx_requestJSON(url: url) { (data, error) in 
        do {
           let object = try data?.map { try T.init(dict: $0) }.first
           completion(object, error)
        }
        catch {
           completion(nil, ServiceError.InvalidJSON)
        } 
    } 
}

Asynchronous: Trong phần giới thiệu FRP ở bài trước, mình có đề cập tới việc ReactiveX tự thân nó không đảm bảo cho việc ứng dụng chạy asynchronous. Chúng ta có thể thấy rõ điều này thông qua ví dụ trên, trong cả hai cách hiện thực, asynchronous được thực hiện bởi hàm responseJSON của Alamofire.

Stream: Trong cách thực hiện thứ hai, không sử dụng ReactiveX, mỗi lần cần chuyển đổi trạng thái của dữ liệu, hoặc cần thêm một layer mới, chúng ta đều phải tạo ra các callback trung gian. Ngoài ra trong quá trình chuyển đổi trạng thái, chúng ta cũng cần phải xử lý các lỗi phát sinh cũng như chuyển tiếp các lỗi từ tầng dưới lên phía trên.

Ngược lại, ReactiveX tạo ra một Stream từ nơi phát sinh tới nơi lắng nghe. Trong quá trình chuyển đổi, nếu có bất kỳ lỗi nào phát sinh, nó sẽ được thêm vào Stream hiện tại (chúng ta có thể chặn việc hoặc chuyển đổi các lỗi này tương tự như một giá trị khác trong stream). Hơn nữa việc sử dụng stream, giúp chúng ta hạn chế việc tạo các state hoặc các callback trung gian.

Chúng ta quay lại thực hiện ứng dụng Timeline sử dụng mô hình MVVM. Chúng ta lần lượt thêm: Model, ViewModel, ViewController:

final class Post: JSONDecodable {
    init(dict: [String: AnyObject]) {
        userId = dict["userid"] as? String ?? ""
        id = dict["id"] as? String ?? ""
        title = dict["title"] as? String ?? ""
        body = dict["body"] as? String
    }
    let userId: String
    let id: String
    let title: String
    let body: String?
}
final class PostAPIService: BaseAPIService {
    private let baseUrl = "https://jsonplaceholder.typicode.com/posts"
    func getPosts(searchText: String) -> Observable {
        let lowerSearchText = searchText.lowercased()
        return rx_requestArray(url: baseUrl).map {
            $0.filter { searchText.isEmpty || $0.title.contains(lowerSearchText) }
            // vì API ko hỗ trợ, nên ví dụ này filter sau khi gọi API
        }
    }
    
    func getPostDetail(id: String) -> Observable {
        return rx_requestObject(url: "\(baseUrl)/\(id)")
    }
}

import RxSwift
import RxCocoa

final class PostViewModel {
    init(service: PostAPIService) {
        self.service = service
    }
    
    func getPosts(searchText: String) -> Observable {
      return service.getPosts(searchText: searchText).map { 
          $0.sorted(by: { $0.title < $1.title })
      }
    }
    
    private let service: PostAPIService
}

Khi user tìm kiếm, để tránh spam server, chúng ta chỉ gửi request sau khoảng 250s (throtle). Nếu request trước chưa hoàn thành, cancel request cũ, gửi request mới (flatMapLatest).

import UIKit
import RxCocoa
import RxSwift

final class PostViewController: UITableViewController {
    override func viewDidLoad() {
        super.viewDidLoad()
        navigationItem.title = NSLocalizedString("Timeline", comment: "")
        setupTableView()
    }

    private func setupTableView() {
        tableView.delegate = nil
        tableView.dataSource = nil
        tableView.register(UITableViewCell.self, forCellReuseIdentifier: "Cell")
        let seachBar = UISearchBar(frame: CGRect(x: 0, y: 0, width: view.frame.width, height: 44))
        tableView.tableHeaderView = seachBar

        let viewModel = self.viewModel
        seachBar.rx.text
        .throttle(0.25, scheduler: Schedulers.backgroundDefault)
        .flatMapLatest { text -> Observable in
            return viewModel.getPosts(searchText: text)
        }
        .catchErrorJustReturn([])
        .bindTo(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (row, post, cell) in
                cell.textLabel?.text = "\(post.title)\n\(post.body)"
         }
         .addDisposableTo(disposeBag)
    }

    var viewModel: PostViewModel!
    private let disposeBag = DisposeBag()
}

Kết quả ta sẽ có màn hình như sau:

ReactiveX timeline

Tải source code cho ứng dụng tại: reactivex sample. (xCode 8, Swift 3.0)

Trong các phần vừa rồi, mình đã giới thiệu về ReactiveX, cũng như cách sử dụng một số toán tử cơ bản trong ReactiveX. Trong bài biết theo, mình sẽ giới thiệu thêm về phần quản lý bộ nhớ, scheduler trong ReactiveX.

One thought on “Sử dụng ReactiveX trong Swift – Phần 2

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất /  Thay đổi )

Google photo

Bạn đang bình luận bằng tài khoản Google Đăng xuất /  Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất /  Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất /  Thay đổi )

Connecting to %s