DEV Community

Yoshinori Imajo
Yoshinori Imajo

Posted on • Edited on

Errors in the stream switched using flatMap are not involved upstream in Combine framework.

The Publisher will terminate the stream in case of an error. However, if the Publisher switched by flatMap fails, the upstream before the switch does not terminate.

(This would be a big difference from RxSwift.)

Using flatMap sample

import Foundation
import Combine

let cancelable = (1...3).publisher
    .handleEvents(receiveOutput: {
        print("πŸ₯· handle output: \($0)")
    }, receiveCompletion: { _ in
        print("πŸ₯· handle completion:")
    }, receiveCancel: {
        print("πŸ₯· handle cancel:")
    })
    .setFailureType(to: NSError.self)
    .flatMap { value -> Future<String, NSError> in
        print("🍏 flatMap: \(value)")
        return Future<String, NSError> {
            guard value == 1 else {
                $0(.failure(NSError(domain: "test", code: 1)))
                return
            }

            $0(.success("\(value)"))
        }
    }
    .sink(receiveCompletion: {
        switch $0 {
        case .finished:
            print("🍎 sink finished:")
        case .failure(let error):
            print("🍎 sink failure:", error)
        }
    }, receiveValue: {
        print("🍎 sink received: \(String(describing: $0)) πŸŽ‰")
    })
Enter fullscreen mode Exit fullscreen mode

This code has the following output:

πŸ₯· handle output: 1
🍏 flatMap: 1
🍎 sink received: 1 πŸŽ‰
πŸ₯· handle output: 2
🍏 flatMap: 2
🍎 sink failure: Error Domain=test Code=1 "(null)"
πŸ₯· handle output: 3
πŸ₯· handle completion:
Enter fullscreen mode Exit fullscreen mode

Notice the πŸ₯· emoji; despite the sink failure, the output will see that the element is handled 3 and even sent to completion.

Using PasshtoughSubject & flatMap sample

Of course, using PassthroughSubject doesn't change the result.

import Foundation
import Combine

let subject = PassthroughSubject<Int, NSError>()

let cancelable = subject
    .handleEvents(receiveOutput: {
        print("πŸ₯· handle output: \($0)")
    }, receiveCompletion: { _ in
        print("πŸ₯· handle completion:")
    }, receiveCancel: {
        print("πŸ₯· handle cancel:")
    })
    .flatMap { value -> Future<String, NSError> in
        print("🍏 flatMap: \(value)")
        return .init {
            guard value == 1 else {
                $0(.failure(NSError(domain: "test", code: 1)))
                return
            }

            $0(.success("\(value)"))
        }
    }
    .sink(receiveCompletion: {
        switch $0 {
        case .finished:
            print("🍎 sink finished:")
        case .failure(let error):
            print("🍎 sink failure:", error)
        }
    }, receiveValue: {
        print("🍎 sink received: \(String(describing: $0)) πŸŽ‰")
    })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)
Enter fullscreen mode Exit fullscreen mode

This code has the following output:

πŸ₯· handle output: 1
🍏 flatMap: 1
🍎 sink received: 1 πŸŽ‰
πŸ₯· handle output: 2
🍏 flatMap: 2
🍎 sink failure: Error Domain=test Code=1 "(null)"
πŸ₯· handle output: 3
πŸ₯· handle completion:
Enter fullscreen mode Exit fullscreen mode

After all, the upstream doesn't care if the downstream switched stream fails. This may cause the stream execution process to continue unnecessarily.

My guess is that to account for this, we would have to control it with subscriptions.

Top comments (0)