iOS · reactive programming · ReactiveX

Sử dụng Reactive Programming trong Swift – Phần 3 – Bất đồng bộ và song song trong ReactiveX

Trong phần 1 và phần 2, mình đã giới thiệu về Rx và một số toán tử trong Rx. Khi mới bắt đầu làm quen và sử dụng Rx, chúng ta thường không chú ý nhiều tới việc asynchronous, parallel hoạt động thế nào trong Rx, cũng như cách Rx quản lý và giải phóng bộ nhớ. Chính điều này gây ra khá nhiều vấn đề khi sử dụng Rx, chẳng hạn: leak memory, performance, trùng lặp dữ liệu, sự kiện,… thậm chí sử dụng sai mục đích các toán tử (breaking contract)

Lập trình bất đồng (asynchronous)
Xử lý song song
Quản lý tài nguyên, bộ nhớ

1. Lập trình bất đồng bộ trong Rx

Như mình đã đề cập ở phần trước, observable thường được sử dụng cho các tác vụ đòi hỏi nhiều thời gian thực thi nên việc lập trình bất đồng độ gần như bắt buộc. Nhắc lại, trong ReactiveX, mặc định observable và chuỗi các toán tử (operations) được thực thi trong cùng một thread, đó chính là thread của observer. Ví dụ:

func createObservable() -> Observable {
    return Observable.create { o in
        print("create on \(Thread.current)")
        Thread.sleep(forTimeInterval: TimeInterval(5))
        o.onNext(1)
        o.onCompleted()
        return Disposables.create()
    }
}

let o = createObservable().map { "item: \($0)" }
o.subscribe(onNext: { text
    print("\(text) on \(Thread.current)))
})

no_observeon

(Hình: Thomas Nield)

Trong ví dụ trên, hàm create, map, subscribe sẽ được chạy trong cùng main thread và block main thread tới khi toàn bộ tác vụ hoàn thành. Điều này có nghĩa là main thread thực thi các lệnh bên trong observable, phát ra các tín hiệu thông qua operator map tới subscribe trên cùng main thread.

1.1 – Sử dụng callback

Thông thường nếu chúng ta muốn một observable phát ra các tín hiệu bất động bộ, chúng ta có thể làm như ví dụ sau:

func createObservable() -> Observable {
    return Observable.create { o in
        let request = Alamofire.request(url)
        request.responseJSON { data in
            o.onNext(data)
            o.onCompleted()
        }
        return Disposables.create {
            request.cancel()
        }
    }
}

Cách này thường được dùng kết hợp với các thư viện sử dụng callback như Alamofire, URLSession, … các thư viện này đã hỗ trợ bất đồng bộ, nên chúng ta có thể tận dụng.

1.2 – Scheduler – Xử lý bất đồng bộ theo tư duy Rx

Xem xét ví dụ sau, các tác vụ đang được chạy đồng bộ trên cùng một thread. Sau đó chúng ta thấy thao tác map tính toán phức tạp cần nhiều thời gian, nó nên được chạy trên một thread khác, hoặc chúng ta muốn các tín hiệu được phát ra trên một thread khác. Nhưng tại thời điểm khai báo biến o, các toán tử of, map chưa được thực thi, nên việc áp dụng lập trình bất đồng bộ như ở phần 1.1 rất khó khăn và không hợp lý.

let o = Observable.of(1...5)
.map { intenseCalculation($0) }

May mắn là trong Rx, chúng ta có thể chỉ định các tác vụ thực thi trên các thread cụ thể thông qua subscribeOn và observeOn. Ngoài ra, một số hàm nhận tham số đầu vào là một scheduler như: throttle, take(duration), …. cho phép chuyển thread bên trong nó.

1.2.1 – subscribeOn:

Sử dụng để chỉ định thread mà observable sẽ sử dụng để thực thi và phát các tín hiệu. Trong ví dụ sau, tất cả các lệnh bên trong hàm create sẽ được thực thi tại background thread.

func createObservable() -> Observable {
    return Observable.create { o in
        print("create on \(Thread.current)")
        Thread.sleep(forTimeInterval: TimeInterval(5))
        o.onNext(1)
        o.onCompleted()
        return Disposables.create()
    }
}

createObservable()
    .subscribeOn(Schedulers.background)
    .map { "item: \($0)" }
    .subscribe(onNext: { text in
        print("\(text) on \(Thread.current)")
    })

Chúng ta có thể khai báo subscribeOn tại bất kỳ vị trí nào (trước khi subscribe) trong chuỗi toán tử. Nếu chúng ta khai báo nhiều hàm subscribeOn thì chỉ có một khai báo đầu tiên (tính từ trái qua phải) được áp dụng, các toán tử còn lại sẽ bị bỏ qua.

createObservable()
    .map { "item: \($0)" }
    .subscribeOn(Schedulers.background) // có thể trước hoặc sau map
    .subscribe(onNext: { text in
        print("\(text) on \(Thread.current)")
    })
createObservable()
    .subscribeOn(Schedulers.background) // được sử dụng  
    .map { "item: \($0)" }
    .subscribeOn(Schedulers.main) // không có tác dụng
    .subscribe(onNext: { text in
        print("\(text) on \(Thread.current)") 
    })

Vì lý do ở trên mà khi áp dụng subscribeOn cho Observable.interval sẽ không có tác dụng vì nội tại hàm interval đã được khai báo subscribeOn background.

Tóm lại, subscribeOn được sử dụng cho observable để xác định thread mà các tín hiệu sẽ được phát ra. Mặc dù vậy, các tín hiệu có thể bị điều hướng sang thread khác nếu chúng ta khai báo toán tử observeOn.

1.2.2 – observeOn:

observeOn sử dụng để chuyển các tín hiệu sang một thread khác. observeOn được áp dụng cho tất cả các toán tử phía sau observeOn cho tới khi gặp observeOn khác.

observeon

(Hình: Thomas Nield)

Ở ví dụ phần 1.2.1, các tín hiệu được phát ra từ background thread, nếu chúng ta muốn cập nhật UI, chúng ta phải chuyển lên main thread. Như vậy, chúng ta có thể sử dụng observeOn như sau:

createObservable()
    .subscribeOn(Schedulers.backgroundDefault)  
    .map { "item: \($0)" }
    .observeOn(Schedulers.main)
    .subscribe(onNext: { text in
        print("\(text) on \(Thread.current)") 
        // update UI
    })

Một vấn đề chúng ta có thể gặp phải khi sử dụng observeOn là backpressure. Xem hình phía trên, stream phía dưới (operator: 1, 2) thực thi ở background thread, nên có thể phát ra các tín hiệu nhanh hơn stream phía trên (operator: 3, 4) có thể xử lý. Khi đó, các tín hiệu (dữ liệu) sẽ không được xử lý và giải phóng, dẫn tới tràn bộ nhớ. Việc lưu trữ này nằm ở stream phía dưới, làm cho hiệu suất hoạt động stream phía dưới giảm, nên được gọi là backpressure.

bottleneck

(Hình: goleansixsigma.com)

Rx cung cấp một số toán tử để tránh việc này như: buffer, throttle, debounce. Về mặt ý tưởng là hạn chế phát ra các tín hiệu trong một khoảng thời gian nhất định.

Tóm lại, chúng ta có thể sử dụng subscribeOn một lần duy nhất, nhưng chúng ta có thể sử dụng nhiều observeOn khi chúng ta cần chuyển stream từ thread này qua thread khác. Nhưng chúng ta cũng không nên lạm dụng việc này, chỉ sử dụng khi cần phải tính toán nhiều.

2. Xử lý song song trong Rx

Chúng ta vừa tìm hiểu về lập trình bất đồng bộ (asynchronous, multi-thread). Mặc dù vậy, các tín hiệu trong một stream vẫn được xử lý tuần tự trên một thread. Câu hỏi là làm thế nào để xử lý song song các tín hiệu trong Rx, có nghĩa là các tín hiệu được xử lý cùng một thời điểm trên các thread khác nhau.

let o = Observable.of(1...10)
.map { intenseCalculation($0) }
.subscribeOn(Schedulers.background)

o.subscribe(onNext: { i in
    print(i)
})

func intenseCalculation(i: Int) -> Int {
    Thread.sleep(forTimeInterval: TimeInterval(5))
}

Ở ví dụ trên, các tín hiệu đã được xử lý ở thread khác, đảm bảo ứng dụng không bị treo khi thực thi. Nhưng các tín hiệu vẫn được xử lý tuần tự từ 1 đến 10.

Khi mới tiếp cận với Rx, để giải quyết bài toàn này, mình phát ra các tín hiệu song song trên nhiều thread như ví dụ sau:

func createObservable() -> Observable {
    return Observable.create { o in
        for i in 1..10 {
            DispatchQueue.init(label: "\(i)") {
                o.onNext(i)
            }
        }
        return Disposables.create()
    }
}

Việc này không đúng với Observable Contract: các tín hiệu phải được phát ra trong cùng một thread. Trước khi giải quyết bài toán song song, mình thấy có hai câu hỏi ở ví dụ trên:

1 – Khi nào phát ra tín hiệu completed, chuyện gì xảy ra nếu phát tín hiệu complete, khi các thread chưa hoàn thành.

2 – Tại sao không tách mỗi thread phát ra tín hiệu là một observable riêng biệt, trong khi nó có đủ các yếu tố có thể sử dụng observable(tốn thời gian, cô lập)?

Để giải quyết các vấn đề trên, chúng ta sử dụng flatMap. Như chúng ta đã biết, mỗi tín hiệu sẽ tạo ra một observable riêng biệt, sau đó các observable sẽ được merge lại thành một stream duy nhất. Chúng ta sử dụng subscribeOn để thực thi các observable đó trên các thread khác nhau.

Observable.of(1...10)
    .flatMap { i in
        Observable.just(i)
        .subscribeOn(Scheduler.background)
        .map { intenseCalculation($0) 
    }
    .subscribe(onNext: { i in
        print(i)
    })

Tóm lại, để xử lý song song, observable không nên phát các tín hiệu song song, mà nên kết hợp flatMap và subscribeOn hoặc observerOn để giải quyết.

3. Quản lý và giải phóng tài nguyên, bộ nhớ

Trong Rx, ngoài việc tạo ra các đối tượng observable, observer,… để phục vụ các mục đích riêng của Rx. Trong một số trường hợp, Rx còn giữ lại một số tín hiệu(dữ liệu) bên trong nó để phục vụ cho các mục đích replay, buffer, zip, takeLast, … Nếu sử dụng không đúng cách các đối tượng này sẽ không được giải phóng, dẫn tới tràn bộ nhớ hoặc leak memory,…

Các đối tượng trên sẽ được huỷ bỏ tham chiếu tới nó khi một trong số sự kiện sau xảy ra:

onError()
onCompleted()
dispose() (takeUtil cũng gọi dispose)

Như vậy, với các observable phát các tín hiệu hữu hạn (finite observable) như just, of, range, … việc không gọi dispose có thể không gây resources leak.

Nhưng với các observable không có tín hiệu kết thúc (infinite observable), nếu không gọi hàm dispose, sẽ gây ra resources leak. Ví dụ:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }
Thread.sleep(forTimeInterval: 2.0)
subscription.dispose()

Nhưng việc gọi dispose() tường mình như vậy không phải là ý tưởng tốt (code smell). Chúng ta nên sử dụng DisposeBag hoặc takeUntil như ví dụ sau:

final class ViewController: UIViewController {
    private let disposeBag = DisposeBag()
    
    override viewDidLoad() {
        super.viewDidLoad()
        Observable<Int>.interval(0.3, scheduler: scheduler)
        .subscribe { event in
            print(event)
        }.addToDisposeBag(disposeBag)
    }
}

disposeBag chứa danh sách các subscription, khi ViewController bị huỷ, disposeBag sẽ bị huỷ. Lúc đó, disposeBag sẽ lặp qua các subscription bên trong và gọi hàm dispose() của mỗi subscription đó.

Khi khởi tạo một observable, chúng ta thường trả về một đối tượng Disposable, khi hàm dispose() được gọi các lệnh bên trong Disposables.create sẽ được thực thi. Cho nên chúng ta nên chú ý đặt các lệnh để giải phóng bộ nhớ (nếu cần) tại đây.

func createObservable() -> Observable {
    return Observable.create { o in
        let request = Alamofire.request(url)
        request.responseJSON { data in
            o.onNext(data)
            o.onCompleted()
        }
        return Disposables.create {
            // sẽ được thực thi khi hàm dispose() được gọi
            request.cancel()
        }
    }
}

Một lưu ý là hàm flatMapLatest, cancel các observable đang được thực thi thông qua việc gọi dispose().

Khi nào nên sử dụng DisposeBag?

Khi mình đọc trên stackoverflow, khuyên rằng với các infinite observable thì chúng ta cần phải sử dụng DisposeBag, còn finite observable thì không cần. Nhìn chung, điều này là đúng, nhưng ta cần hiểu chính xác hơn. Ví dụ:

final class ViewController: UIViewController {
    private let disposeBag = DisposeBag()
    
    override viewDidLoad() {
        super.viewDidLoad()
        Observable.just(1)
        .map { intenseCalulation($0) }
        .subscribe { event in
            print(event)
        }.addToDisposeBag(disposeBag)
    }
}

Open ViewController -> hàm intenseCalculation đang  thực thi -> thoát khỏi ViewController. Nếu chúng ta muốn dừng intenseCalculation, cũng như hủy toàn bộ bộ nhớ liên quan khi thoát khỏi view, thì chúng ta nên sử dụng addToDisposeBag. Nếu chúng ta muốn, khi nào tác vụ hoàn thành, mới huỷ thì chúng ta không dùng  addToDisposeBag

Nhìn chung, chúng ta nên sử dụng addToDisposeBag cho tất cả trường hợp infinite observable. Với finite observable thì chúng ta có thể cân nhắc như ví dụ trên.

Kết

Trong bài này mình đã giới thiệu với các bạn cách xử lý bất đồng bộ, song song, cũng như quản lý tài nguyên bộ nhớ trong Rx. Mình tạm dừng chủ đề Rx ở đây, và chuyển qua chủ đề mới. Nếu có thời gian mình sẽ viết thêm một số bài về Subject: PublishSubject, BehaviorSubject, ReplaySubject, Share and Multicast trong Rx.

2 thoughts on “Sử dụng Reactive Programming trong Swift – Phần 3 – Bất đồng bộ và song song trong ReactiveX

  1. sonhei com minha esposa toda machucada parecia que ela estava atprleoada ou espancada ela aparecia do nada andando e eu chamava por seu nome e ela passava com olhar triste parecia uma morta viva e não me respondia, foi muito ruim a este sonho e quando percebi que não poderia ser verdade em seguida me acordei e asustado na mesma hora eu agarei ela e tive muito medo de algo acontecer com ela pois eu a amo muito.

    Số lượt thích

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất /  Thay đổi )

Google photo

Bạn đang bình luận bằng tài khoản Google Đăng xuất /  Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất /  Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất /  Thay đổi )

Connecting to %s