soranoba
soranoba Author of soranoba.net
programming

Combineの実行タイミングについて理解する

delegateで自分で実装した場合は、実行の割り込みタイミングは自明だが、Combineについては今まで雰囲気で使っていたので、理解を深めつつ備忘録として調査内容を残す。

環境

  • Swift 6.0
  • Xcode 16.0

Scheduler

何も指定していない時、SchedulerはImmediateSchedulerと同様の動作をする(つまり、割り込み実行)
DispatchQueueを指定した場合はDispatchQueue.asyncのような動作をする

実行スレッド

Receive Schedulerによって決まる。

  • receive(on:)で指定されたスレッドで非同期に実行される
  • receive(on:)で指定しない場合、もしくはImmediateSchedulerに準ずるSchedulerを指定した場合は、send中に同期的に実行される(割り込み実行)

つまり、MainThread以外でPublish可能かつMainThreadでcallbackを実行したい場合は、receive(on: DispatchQueue.main)を指定する必要がある

Combine.Published

@Published varで定義するとPublisherを利用することができる。
このPublisherは以下のように動作する。

  • Subscriber接続時に即座にpublishされる
  • 値更新のwillSet時にpublishされる

つまり、Receive ScedulerImmediateSchedulerの場合にreceive時にプロパティを参照すると前の値が参照される。
特にreceive時にメソッドを呼び出す場合、Publishされてきた値を使用しなければならない点に注意する必要がある。
Schedulerを指定することで常に最新の値を参照することも可能。

排他制御

異なるスレッドから同時にpublishされた場合でも、その回数分のpublishが実行される

ImmediateSchedulerの場合

let data = Data()
cancellables = [
    data.$value1.sink { value1 in
        print("--- receive: \(value1)")
        usleep(2000)
        print("--- receive = \(value1), current = \(data.value1)")
    },
]
(1...5).forEach { value in
    DispatchQueue.global().async {
        usleep(useconds_t(value * 1000))
        print("enqueue: \(value)")
        data.value1 = value
        print("end: \(value), current = \(data.value1)")
    }
}
--- receive: 0
--- receive = 0, current = 0
enqueue: 1
--- receive: 1
enqueue: 2
--- receive = 1, current = 0
enqueue: 3
end: 1, current = 1
--- receive: 2
enqueue: 4
enqueue: 5
--- receive = 2, current = 1
end: 2, current = 2
--- receive: 3
--- receive = 3, current = 2
end: 3, current = 3
--- receive: 5
--- receive = 5, current = 3
end: 5, current = 5
--- receive: 4
--- receive = 4, current = 5
end: 4, current = 4

このようにシーケンシャルに実行されるが、やや直感に反する挙動になる

  • 値の書き込み時にlockが取られ、callbackの処理が割り込み実行される
  • 複数の書込みが待機している場合、待機した順番とは異なる順番で処理される可能性がある

SerialQueueの場合

let data = Data()
cancellables = [
    data.$value1.receive(on: queue).sink { value1 in
        usleep(1000)
        print("receive = \(value1), current = \(data.value1)")
    },
]
let lock = NSLock()
(1...5).forEach { value in
    DispatchQueue.global().async {
        usleep(useconds_t(Int.random(in: 0...100)))
        lock.withLock {
            print("enqueue: \(value)")
            data.value1 = value
        }
    }
}
enqueue: 3
enqueue: 5
enqueue: 1
enqueue: 2
enqueue: 4
receive = 0, current = 4
receive = 3, current = 4
receive = 5, current = 4
receive = 1, current = 4
receive = 2, current = 4
receive = 4, current = 4

こちらはenqueueした順番に確実に処理することができる。

Subscribe Scheduler

Upstreamへのメッセージ送信のSchedulerを指定することができる.
Upstreamへのメッセージの代表的な物に、subscribeリクエストがある

@MainActor func execute4() async {
    let data = Data()
    cancellables = [
        data.$value1.subscribe(on: DispatchQueue.main).sink { value1 in
            print("argument = \(value1), current = \(data.value1), isMainThread = \(Thread.isMainThread)")
        },
    ]

    for num in 1...5 {
        data.value1 = num
    }
    try? await Task.sleep(for: .seconds(0.1))
    for num in 6...10 {
        data.value1 = num
    }
}
argument = 5, current = 5, isMainThread = true
argument = 6, current = 5, isMainThread = true
argument = 7, current = 6, isMainThread = true
argument = 8, current = 7, isMainThread = true
argument = 9, current = 8, isMainThread = true
argument = 10, current = 9, isMainThread = true

このように、subscribeを非同期にすることでsubscribe開始を遅延させることができる。
他にも特殊な使い方ができると思われるが、直感に反する挙動になりがちなので注意が必要かもしれない。

タイミング制御

様々な制御があるが、代表的なユースケースとして、複数のプロパティに対して全ての書込みが完了してから纏めて実行するという物がある。
これはdebounce(for:scheduler:)を使うことで制御できる。

let data = Data()
cancellables = [
    Publishers.CombineLatest(data.$value1, data.$value2)
        .debounce(for: .seconds(0), scheduler: DispatchQueue.main)
        .sink { value1, value2 in
            print("receive: value1 = \(value1), value2 = \(value2) / current: value1 = \(data.value1), value2 = \(data.value2)")
        }
]

Task { @MainActor in
    try? await Task.sleep(for: .seconds(0.1))
    (11...20).forEach { data.value1 = $0 }
    (21...30).forEach { data.value2 = $0 }
}
receive: value1 = 0, value2 = 0 / current: value1 = 0, value2 = 0
receive: value1 = 20, value2 = 30 / current: value1 = 20, value2 = 30

上記はメインスレッドで実行している為、全てが終わってから処理される。
別スレッドで書込みをしている場合は、全て完了する前に実行されるが、それでもある程度は纏めて実行される。万が一パフォーマンスが気になる場合は、時間を調整すると良い。


この記事中のコードはGitHub上で確認できる。

(Updated: )

comments powered by Disqus