DEV Community

Cover image for Combine.framework Scheduler recap (Diagram)
Yoshinori Imajo
Yoshinori Imajo

Posted on

Combine.framework Scheduler recap (Diagram)

Diagram to understand about Combine Scheduler

Combine allows you to switch queues using a scheduler.

I'll recap the methods receive(on:options) and receive(on:options), which are used by the scheduler to do so, as shown in the diagram below.

The follwing image was added to the slides in WWDC19 Introducing Combine session.

combine_scheduler

The following is a summary of the timing for executing the queues specified by the scheduler.

  • receive(on:options)
    • Timing of Queue Execution
      • When receiving an element.
        • receive(_ input:)
      • When receiving an completion.
        • receive(completion:)
  • subscribe(on:options)
    • Timing of Queue Execution
      • When Publisher is subscribed by a subscriber.
        • subscribe(_:)
        • sink(receiveValue:)
        • sink(completion:receiveValue:)
        • ...etc
      • When a Subscriber successfully subscribes to a Publisher and requests an element.
        • receive(subscription:)
      • When Publisher is canceled.

Scheduler Protocol

The Scheduler is a Protocol, and the following required methods are called

  • schedule(options:_:)
  • schedule(after:tolerance:options:_:)
  • schedule(after:interval:tolerance:options:_:)

Reflections on Apple's Document

For these reasons, see Apple's documentation.

receive(on:options)

Sample code for receive(on:options) is shown below.

let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.


jsonPublisher
    .subscribe(on: backgroundQueue)
    .receive(on: RunLoop.main)
    .subscribe(labelUpdater)
Enter fullscreen mode Exit fullscreen mode

The document further states

receive(on:options:) changes the execution context of downstream messages.

This is specifically the case for the
Queueing is specified when receive(_ input:) and receive(completion:) are called.

However, it does not affect the call to receive(subscription:).

Note
receive(on:options:) doesn’t affect the scheduler used to call the subscriber’s receive(subscription:) method.

subscribe(on:options)

Sample code for subscribe(on:options) is shown below.

let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.


jsonPublisher
    .subscribe(on: backgroundQueue)
    .receive(on: RunLoop.main)
    .subscribe(labelUpdater)
Enter fullscreen mode Exit fullscreen mode

The document further states

subscribe(on:options:) changes the execution context of upstream messages.

Using subscribe(on:options:) also causes the upstream publisher to perform cancel() using the specified scheduler.

In other words, when Publisher is requested to the Subscriber for when to specify the execution of the scheduler.

And when the Publisher is canceled, it is also executed by the scheduler.

(By the way, I implemented the Scheduler, and the scheduler method is also executed when the Publisher is subscribed to the Subscriber. The OSS pointfreeco/combine-scheduler uses it to get the events on subscribe.)

Comparison with Rx.

The next topic is the difference compared to the Scheduler in Rx.

In Rx documentation, there are two operators, observeOn and subscribeOn, which are shown in the marble diagram.

This behavior is probably the same in the Combine.framework.
Combine's behavior is investigated in the following code.

let receiveQueue1 = DispatchQueue(label: "[Receive queue 1]")
let subscribeQueue = DispatchQueue(label: "[Subscribe queue]")
let receiveQueue2 = DispatchQueue(label: "[Receive queue 2]")

Just(1)
    .flatMap { value -> AnyPublisher<Int, Never> in
        let label = String(validatingUTF8: __dispatch_queue_get_label(nil))!
        print("flatMap execute queue: \(label)") // => [Subscribe queue]

        return Just(value).eraseToAnyPublisher()
    }
    .receive(on: receiveQueue1)
    .map { value -> Int in
        let label = String(validatingUTF8: __dispatch_queue_get_label(nil))!
        print("map execute queue: \(label)") // => [Receive queue 1]

        return value * 10
    }
    .subscribe(on: subscribeQueue)
    .receive(on: receiveQueue2)
    .sink(receiveValue: {
        let label = String(validatingUTF8: __dispatch_queue_get_label(nil))!
        print("sink execute queue: \(label), \($0)") // => [Receive queue 2], 10
    })
Enter fullscreen mode Exit fullscreen mode

Top comments (0)