Thanks for using Compiler Explorer
Sponsors
Jakt
C++
Ada
Algol68
Analysis
Android Java
Android Kotlin
Assembly
C
C3
Carbon
C with Coccinelle
C++ with Coccinelle
C++ (Circle)
CIRCT
Clean
Clojure
CMake
CMakeScript
COBOL
C++ for OpenCL
MLIR
Cppx
Cppx-Blue
Cppx-Gold
Cpp2-cppfront
Crystal
C#
CUDA C++
D
Dart
Elixir
Erlang
Fortran
F#
GLSL
Go
Haskell
HLSL
Helion
Hook
Hylo
IL
ispc
Java
Julia
Kotlin
LLVM IR
LLVM MIR
Modula-2
Mojo
Nim
Numba
Nix
Objective-C
Objective-C++
OCaml
Odin
OpenCL C
Pascal
Pony
PTX
Python
Racket
Raku
Ruby
Rust
Sail
Snowball
Scala
Slang
Solidity
Spice
SPIR-V
Swift
LLVM TableGen
Toit
Triton
TypeScript Native
V
Vala
Visual Basic
Vyper
WASM
Yul (Solidity IR)
Zig
Javascript
GIMPLE
Ygen
sway
swift source #1
Output
Compile to binary object
Link to binary
Execute the code
Intel asm syntax
Demangle identifiers
Verbose demangling
Filters
Unused labels
Library functions
Directives
Comments
Horizontal whitespace
Debug intrinsics
Compiler
aarch64 swiftc 6.0.3
aarch64 swiftc 6.1
aarch64 swiftc 6.2
x86-64 swiftc 3.1.1
x86-64 swiftc 4.0.2
x86-64 swiftc 4.0.3
x86-64 swiftc 4.1
x86-64 swiftc 4.1.1
x86-64 swiftc 4.1.2
x86-64 swiftc 4.2
x86-64 swiftc 5.0
x86-64 swiftc 5.1
x86-64 swiftc 5.10
x86-64 swiftc 5.2
x86-64 swiftc 5.3
x86-64 swiftc 5.4
x86-64 swiftc 5.5
x86-64 swiftc 5.6
x86-64 swiftc 5.7
x86-64 swiftc 5.8
x86-64 swiftc 5.9
x86-64 swiftc 6.0.3
x86-64 swiftc 6.1
x86-64 swiftc 6.2
x86-64 swiftc devsnapshot
x86-64 swiftc nightly
Options
Source code
import Observation import Synchronization @Observable @MainActor final class N { var value = 1 func increment() { value += 1 } var squares: Observed<Int, Never> { Observed { self.value * self.value } } } @MainActor func producerOutpacingConsumerBreaksObserved() async { let numbers = N() let squares = numbers.squares Task { @MainActor in for await square in squares { print("observed value: \(square)") try? await Task.sleep(for: .milliseconds(500)) } } let maxIters = 10 var iterCount = 0 while iterCount < maxIters { defer { iterCount += 1 } print("producer incrementing value to: \(numbers.value + 1)") numbers.increment() // if production outpaces consumption, the sequence breaks try? await Task.sleep(for: .milliseconds(250)) } } @main struct Test { static func main() async { print("start") await producerOutpacingConsumerBreaksObserved() } } // MARK - public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendable { final class SharedState<State>: Sendable { let criticalRegion: Mutex<State> init(_ state: consuming sending State) { criticalRegion = Mutex(state) } internal func withCriticalRegion<R, F: Error>(body: (inout sending State) throws(F) -> sending R) throws(F) -> R { try criticalRegion.withLock(body) } } struct State { enum Continuation { case cancelled case active(UnsafeContinuation<Void, Never>) func resume() { switch self { case .cancelled: break case .active(let continuation): continuation.resume() } } } var id = 0 var tracking = false var continuations: [Int: Continuation] = [:] static func generation(_ state: SharedState<State>) -> Int { state.withCriticalRegion { state in defer { state.id &+= 1 } return state.id } } static func cancel(_ state: SharedState<State>, id: Int) { state.withCriticalRegion { state in guard let continuation = state.continuations.removeValue(forKey: id) else { state.continuations[id] = .cancelled return nil as Continuation? } return continuation }?.resume() } static func startTracking(_ state: SharedState<State>) -> Bool { state.withCriticalRegion { state in if !state.tracking { state.tracking = true return true } else { return false } } } static func emitWillChange(_ state: SharedState<State>) { let continuations = state.withCriticalRegion { state in defer { state.continuations.removeAll() } return state.continuations.values } for continuation in continuations { continuation.resume() } } static func willChange(_ state: SharedState<State>, id: Int) async { // demonstrates that this does not run on the iterator's isolation in the current implementation: // MainActor.assertIsolated("not on main actor") return await withUnsafeContinuation { continuation in state.withCriticalRegion { state in if case .cancelled = state.continuations[id] { state.continuations[id] = nil return continuation as UnsafeContinuation<Void, Never>? } else { state.continuations[id] = .active(continuation) return nil as UnsafeContinuation<Void, Never>? } }?.resume() } } } let state: SharedState<State> let emit: @isolated(any) @Sendable () throws(Failure) -> Element? public init( @_inheritActorContext _ emit: @escaping @isolated(any) @Sendable () throws(Failure) -> Element? ) { self.emit = emit self.state = SharedState(State()) } public struct Iterator: AsyncIteratorProtocol { var state: SharedState<State>? let emit: @isolated(any) @Sendable () throws(Failure) -> Element? fileprivate static func trackEmission(isolation trackingIsolation: isolated (any Actor)?, state: SharedState<State>, emit: @escaping @isolated(any) @Sendable () throws(Failure) -> Element?) throws(Failure) -> Element? { let result = withObservationTracking { Result(catching: emit) } onChange: { [state] in State.emitWillChange(state) } return try result.get() } fileprivate mutating func terminate(throwing failure: Failure? = nil, id: Int) throws(Failure) -> Element? { state?.withCriticalRegion { state in state.continuations.removeValue(forKey: id) }?.resume() state = nil if let failure { throw failure } else { return nil } } fileprivate mutating func trackEmission(isolation iterationIsolation: isolated (any Actor)?, state: SharedState<State>, id: Int) async throws(Failure) -> Element? { guard !Task.isCancelled else { return try terminate(id: id) } guard let element = try await Iterator.trackEmission(isolation: emit.isolation, state: state, emit: emit) else { return try terminate(id: id) } return element } public mutating func next(isolation iterationIsolation: isolated (any Actor)? = #isolation) async throws(Failure) -> Element? { guard let state else { return nil } let id = State.generation(state) do { if State.startTracking(state) { return try await trackEmission(isolation: iterationIsolation, state: state, id: id) } else { // alias isolation to demonstrate the issue with actor hopping let isolationAlias = iterationIsolation await withTaskCancellationHandler { // N.B. this closure is currently nonisolated since it does not capture the isolated parameter // isolationAlias?.assertIsolated("not isolated in next()") // and even if we capture isolation in this closure, the call to `State.willChange()` will hop to the global executor // iterationIsolation?.assertIsolated("now we're isolated") await State.willChange(state, id: id) } onCancel: { State.cancel(state, id: id) } return try await trackEmission(isolation: iterationIsolation, state: state, id: id) } } catch { return try terminate(throwing: error, id: id) } } } public func makeAsyncIterator() -> Iterator { Iterator(state: state, emit: emit) } }
Become a Patron
Sponsor on GitHub
Donate via PayPal
Compiler Explorer Shop
Source on GitHub
Mailing list
Installed libraries
Wiki
Report an issue
How it works
Contact the author
CE on Mastodon
CE on Bluesky
Statistics
Changelog
Version tree