Kotlin Coroutines · Reactive Streams

Flow,
StateFlow,
SharedFlow
& Channels

A complete deep dive into Kotlin's reactive stream primitives — how they work, how they differ, and exactly when to reach for each one.

cold streams state holders event broadcast one-shot events backpressure operators
00 · The Fundamental Split

Cold vs Hot Streams

Before diving into the types, this distinction is everything. It determines whether your data source activates on demand or runs independently of listeners.

Cold Stream
The producer code runs only when collected. Each new collector gets its own independent execution from the start — like a streaming video you download fresh each time. No collectors = no work done. Flow is cold.
Hot Stream
The producer runs independently — whether or not anyone is listening. Late collectors see only current/future values, not past ones (unless replay is configured). StateFlow, SharedFlow, and Channel are hot.
💡

Mental model: Cold flow = a function call that generates a sequence on demand. Hot flow = a live radio broadcast you tune into. You can tune in anytime, but you only hear what's playing right now (unless the station has a replay buffer).

Property Flow (cold) StateFlow (hot) SharedFlow (hot) Channel (hot)
Activates on collect? Yes No No No
Multiple collectors Each gets own stream Shared, same values Shared, same events Values split among collectors
Replays past values All (cold restart) Last value always Configurable (0..n) No replay
Has current state No Yes — .value No No
Backpressure handling Suspend-based Drops if slow collector Configurable Suspend / buffer
01 · Cold Stream

Flow

The foundational reactive primitive. A cold, sequential stream of values — nothing runs until someone collects it, and each collector gets the full sequence from scratch.

A Flow<T> is conceptually a suspended function that emits multiple values. The producer block is defined using the flow { } builder, which gives you access to emit(). The entire producer block executes in a coroutine, on the collector's coroutine context (upstream operators can change this).

flow { emit(1) · emit(2) · emit(3) } — one collector, sequential
Flow basics
// Creating a Flow — the producer block is NOT executed here
fun temperatureReadings(): Flow<Double> = flow {
    while (true) {
        val temp = thermometer.read()   // suspending call
        emit(temp)                        // suspend until collector ready
        delay(1000)
    }
}

// Collecting — NOW the producer block runs
viewModelScope.launch {
    temperatureReadings()
        .map { it * 9/5 + 32 }           // Celsius → Fahrenheit
        .filter { it > 98.6 }              // only fevers
        .collect { temp ->
            Log.d("Fever", "$temp°F")
        }
}

// Two collectors = two independent producer executions
val flow = temperatureReadings()
launch { flow.collect { /* collector A — own coroutine */ } }
launch { flow.collect { /* collector B — independent execution */ } }

Context preservation

By default, a Flow emits in the collector's coroutine context — the producer runs on whatever thread the collect call lives on. Use flowOn(dispatcher) to move the upstream producer to a different context. This is the only correct way to change Flow execution context.

flowOn — context switching
fun searchDatabase(query: String): Flow<List<Result>> =
    flow {
        val results = db.search(query)   // runs on IO
        emit(results)
    }.flowOn(Dispatchers.IO)          // ← moves UPSTREAM to IO thread

// The collect{} block always runs on the COLLECTOR's context (Main)
viewModelScope.launch {            // ← this is Main
    searchDatabase("kotlin")
        .collect { results ->
            binding.list.adapter = ResultAdapter(results)  // safe: Main
        }
}

// ✗ WRONG: do NOT use withContext inside a flow builder
val badFlow = flow {
    withContext(Dispatchers.IO) {    // throws IllegalStateException
        emit(fetchData())
    }
}

Context preservation rule: A Flow must emit in the same context it was collected in. Calling withContext inside a flow { } builder violates this and throws. Use flowOn on the outside instead.

Exception handling

Error handling in flows
userFlow()
    .catch { e ->                          // catches UPSTREAM exceptions
        emit(User.empty())                   // can emit a fallback value
        logger.error("Flow error", e)
    }
    .onEach { user ->                       // side effect per emission
        analytics.track("user_loaded")
    }
    .onCompletion { cause ->               // always fires — success or error
        if (cause != null) showError(cause)
    }
    .collect { user -> render(user) }

// catch does NOT catch exceptions in collect{} — only upstream
// For collect exceptions: wrap collect{} in try/catch
try {
    flow.collect { value ->
        if (value < 0) throw IllegalStateException("negative")
    }
} catch (e: Exception) { handleError(e) }
FLOW DEMO — cold stream, independent collectors
Collector A
idle — press "Collect A"
Collector B
idle — press "Collect B"
Each collector starts its own independent emission sequence from scratch.
02 · Hot State Holder

StateFlow

A hot flow that holds a single current value, always available synchronously. The reactive successor to LiveData — it's the standard for exposing UI state from a ViewModel.

Think of StateFlow as an observable variable. It always has a value (you must provide an initial one). When the value changes, all current collectors receive the new value. Late collectors immediately get the current value upon subscription.

Crucially, StateFlow has conflation: if a collector is slow and multiple values are emitted rapidly, it only receives the latest value — intermediate values are dropped. This makes it ideal for state representation but wrong for events.

StateFlow — always has current value, late subscribers get it immediately
StateFlow in a ViewModel
class ProfileViewModel : ViewModel() {

    // Private mutable — only ViewModel can write
    private val _uiState = MutableStateFlow<ProfileState>(ProfileState.Loading)

    // Public immutable — UI reads only
    val uiState: StateFlow<ProfileState> = _uiState.asStateFlow()

    // Read current value synchronously — no coroutine needed
    val currentState: ProfileState get() = _uiState.value

    fun loadProfile(id: String) {
        viewModelScope.launch {
            _uiState.update { ProfileState.Loading }   // atomic update
            try {
                val profile = repo.getProfile(id)
                _uiState.update { ProfileState.Success(profile) }
            } catch (e: Exception) {
                _uiState.update { ProfileState.Error(e.message) }
            }
        }
    }
}

// In Fragment — lifecycle-safe collection
viewLifecycleOwner.lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.uiState.collect { state ->
            when (state) {
                is ProfileState.Loading  -> showLoading()
                is ProfileState.Success  -> render(state.profile)
                is ProfileState.Error    -> showError(state.message)
            }
        }
    }
}

stateIn — converting a cold Flow to StateFlow

stateIn is the most important operator for production use. It converts a cold Flow (e.g., from Room) into a StateFlow with configurable sharing and lifecycle behavior.

stateIn — the production pattern
val users: StateFlow<List<User>> = repo.getAllUsers()  // cold Flow from Room
    .map { it.filter { u -> u.isActive } }
    .stateIn(
        scope = viewModelScope,
        // WhileSubscribed(5000): keep upstream alive for 5s after last
        // subscriber — survives rotation (brief unsubscribe ~100ms)
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList()
    )

// SharingStarted options:
// Eagerly       — starts immediately, never stops
// Lazily        — starts on first subscriber, never stops
// WhileSubscribed(stopTimeout) — starts/stops with subscribers
//   └── the 5000ms grace period prevents restart on rotation
STATEFLOW DEMO — conflation & current value
Current .value
accessible synchronously
Late subscriber
click "Late subscriber" after emitting values
Collector 1
Collector 2
StateFlow always has a current value. Late subscribers immediately receive it.
03 · Hot Event Broadcast

SharedFlow

A hot flow for broadcasting events to multiple subscribers simultaneously. Unlike StateFlow, it has no "current value" concept — it's designed for events and notifications, not state.

SharedFlow is the most configurable reactive primitive. Three constructor parameters control its entire behavior: replay controls how many past values new subscribers receive; extraBufferCapacity adds emission buffer so fast producers don't suspend; and onBufferOverflow decides what happens when the buffer fills up.

SharedFlow — event bus pattern
class FeedViewModel(repo: FeedRepository) : ViewModel() {

    // replay=0: no history. New subscribers see nothing until next emit.
    // extraBufferCapacity=1: don't suspend emitter for 1 pending event.
    private val _events = MutableSharedFlow<UiEvent>(
        replay = 0,
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val events: SharedFlow<UiEvent> = _events

    fun onLikeClicked(postId: String) {
        viewModelScope.launch {
            repo.likePost(postId)
            // tryEmit: non-suspending. Returns false if buffer full.
            _events.tryEmit(UiEvent.ShowSnackbar("Liked!"))
            // emit: suspending — waits if buffer full
            // _events.emit(UiEvent.ShowSnackbar("Liked!"))
        }
    }

    fun onDeleteClicked(postId: String) {
        viewModelScope.launch {
            repo.deletePost(postId)
            _events.emit(UiEvent.NavigateBack)
        }
    }
}

// Collecting in Fragment
repeatOnLifecycle(Lifecycle.State.STARTED) {
    viewModel.events.collect { event ->
        when (event) {
            is UiEvent.ShowSnackbar  -> showSnackbar(event.message)
            is UiEvent.NavigateBack  -> findNavController().popBackStack()
            is UiEvent.ShowDialog    -> showDialog(event.config)
        }
    }
}

replay — the buffer for late subscribers

The replay parameter is SharedFlow's most subtle knob. Setting replay=0 means late subscribers miss past events entirely — perfect for one-shot UI events (navigation, toasts) where replaying after rotation would re-trigger them. Setting replay=1 turns SharedFlow into something approaching StateFlow but without equality-based deduplication.

Replay cache — what late subscribers receive
replay = 0
Late subscriber sees nothing from the past
replay = 1
E₃
Last emitted value replayed
replay = 3
E₁
E₂
E₃
Last 3 values replayed in order

onBufferOverflow strategies

SUSPEND
Emitter suspends until a collector drains the buffer. Provides true backpressure. Good for producer-consumer scenarios where no data should be lost.
DROP_OLDEST
Drops the oldest buffered value to make room for the new one. Good for events where the latest is most relevant and old ones can be discarded.
DROP_LATEST
Drops the newest value being emitted if the buffer is full. The emitter does not suspend. Good for sensor data where dropping new readings is acceptable.

shareIn — converting cold Flow to SharedFlow

shareIn
val locationUpdates: SharedFlow<Location> =
    locationProvider.updates()        // cold Flow from GPS hardware
        .shareIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            replay = 1                  // late subscribers get last location
        )
// Now multiple collectors share ONE GPS subscription instead of
// each opening their own (expensive!) GPS connection.
SHAREDFLOW DEMO — broadcast to multiple collectors
Subscriber 1
waiting for events...
Subscriber 2
waiting for events...
SharedFlow broadcasts every emission to ALL active subscribers simultaneously.
04 · Hot Point-to-Point

Channel

A coroutine primitive for communicating between coroutines. Unlike flows, channels are not multicast — values are consumed by exactly one receiver. Think of it as a typed queue with coroutine-native blocking.

A Channel is essentially a concurrent blocking queue for coroutines. send() puts a value in; receive() takes one out. Both can suspend: send suspends if the buffer is full; receive suspends if the channel is empty. This is the fundamental coroutine mechanism for the producer-consumer pattern.

Channels vs SharedFlow: When two coroutines share a Channel, only one of them receives each value. With SharedFlow, all subscribers see every emission. This makes Channels the right tool for work distribution (task queues, pipelines) and SharedFlow right for observation.

Channel basics
// Basic channel — producer/consumer
val channel = Channel<Int>(Channel.BUFFERED)  // buffer of 64

// Producer coroutine
launch {
    for (i in 1..100) {
        channel.send(i)     // suspends if buffer full
    }
    channel.close()       // signal completion
}

// Consumer coroutine — for-loop until channel closed
launch {
    for (value in channel) {
        processItem(value)
    }
}

// Multiple consumers — values are SPLIT between them (not broadcast)
repeat(4) { workerId ->
    launch {
        for (task in channel) {
            Log.d("Worker $workerId processing task $task")
        }
    }
}

Channel capacity types

Channel buffer strategies
RENDEZVOUS (0)
No buffer. send() suspends until receive() is called. True synchronous handoff.
BUFFERED (64)
T₁
T₂
T₃
64-slot buffer. send() suspends only when full.
CONFLATED
LAST
Only keeps the most recent value. send() never suspends. Like MutableStateFlow but for one-shot access.
UNLIMITED
T₁
T₂
Unbounded buffer. send() never suspends but can OOM.

produce { } — channel as a Flow

produce builder & channelFlow
// produce{} — creates a ReceiveChannel with coroutine scope
val numbers: ReceiveChannel<Int> = produce {
    for (i in 1..10) {
        send(i)
        delay(100)
    }
}

// channelFlow{} — Flow API with Channel semantics
// Bridge: you need Flow's operators but Channel's concurrent sends
fun mergedSensors(): Flow<SensorData> = channelFlow {
    launch { sensor1.readings().collect { send(it) } }
    launch { sensor2.readings().collect { send(it) } }
    launch { sensor3.readings().collect { send(it) } }
    // All three sensors feed into one Flow concurrently
}
// Note: regular flow{} can only call emit() from one coroutine.
// channelFlow{} allows concurrent send() from multiple coroutines.
CHANNEL DEMO — values consumed by ONE receiver (not broadcast)
Two workers compete for tasks — each task goes to exactly ONE worker:
Worker 1
waiting for tasks...
Worker 2
waiting for tasks...
Unlike SharedFlow, each sent value is consumed by exactly one receiver.
05 · Transform

Flow Operators

Operators are the transformation layer between producer and consumer. They form a pipeline where each operator creates a new Flow wrapping the previous one.

upstream
flow { }
.map { }
transform T
.filter { }
predicate
.flowOn(IO)
context
.collect { }
terminal
Essential operators
val searchFlow: Flow<String> = searchField.textChanges()

searchFlow
    // Transformation
    .map { query -> query.trim().lowercase() }
    .filter { it.length >= 3 }                // only non-trivial queries
    .distinctUntilChanged()                    // skip identical consecutive

    // Timing
    .debounce(300)                             // wait 300ms of inactivity

    // Switching — cancel previous search on new query
    .flatMapLatest { query ->                  // ← KEY for search!
        repo.search(query)                     // cancels previous call
    }

    // Error handling
    .catch { e -> emit(emptyList()) }

    // Side effects (don't transform)
    .onEach { results -> analytics.log(results.size) }
    .onStart { showLoading() }
    .onCompletion { hideLoading() }

    // Terminal
    .collect { results -> adapter.submitList(results) }

The flatMap family — concurrency control

These are the most powerful and most nuanced operators. They all take each emitted value and produce an inner Flow from it — but differ in how they handle concurrent inner Flows:

Operator When new value arrives Use case
flatMapLatest Cancels previous inner Flow, starts new one Search, live queries — only care about latest
flatMapConcat Queues — waits for previous to complete first Ordered operations, sequential processing
flatMapMerge Runs all inner Flows concurrently Parallel downloads, fire-and-collect
transformLatest Like flatMapLatest but with arbitrary emit logic Complex transformations that need cancellation
06 · Multi-stream

Combining Flows

Real apps need multiple data sources coordinated together. These operators merge, zip, and interleave streams.

Combining operators
// combine — emits whenever EITHER flow emits (latest from both)
combine(userFlow, settingsFlow) { user, settings ->
    UiModel(user.name, settings.theme)
}.collect { model -> render(model) }

// zip — pairs emissions by index — waits for both to emit
flowA.zip(flowB) { a, b -> "$a-$b" }
// flowA: 1, 2, 3     flowB: a, b, c     result: "1-a", "2-b", "3-c"

// merge — interleaves N flows (order by arrival time)
merge(flow1, flow2, flow3).collect { value -> handle(value) }

// Real example: combine user + subscription + features
val dashboard: StateFlow<DashboardState> = combine(
    repo.getUser(),
    repo.getSubscription(),
    repo.getFeatureFlags()
) { user, sub, flags ->
    DashboardState(
        userName = user.name,
        isPro = sub.isPro,
        showNewFeature = flags.newDashboard
    )
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), DashboardState.Loading)
07 · Production

Android Patterns

Flows become dangerous if collected outside a lifecycle-aware scope. These patterns keep your app safe and efficient.

The golden rule: repeatOnLifecycle

Never use lifecycleScope.launch { flow.collect { } } alone — this keeps collecting even when the Activity is stopped (in the background), wasting CPU and potentially crashing when touching UI that's been destroyed. Always wrap in repeatOnLifecycle:

Safe collection pattern
// ✗ UNSAFE — keeps collecting in background
lifecycleScope.launch {
    viewModel.state.collect { render(it) }
}

// ✓ SAFE — pauses when STOPPED, resumes when STARTED
lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        // Collect multiple flows concurrently within the same lifecycle
        launch { viewModel.uiState.collect { render(it) } }
        launch { viewModel.events.collect { handleEvent(it) } }
    }
}

// ✓ COMPOSE — lifecycle-aware by default
val state by viewModel.uiState.collectAsStateWithLifecycle()
//                                ↑ stops collection when UI is not visible
//                                  USE THIS not .collectAsState()

The complete ViewModel pattern

Production ViewModel
class ArticleViewModel(
    private val repo: ArticleRepository,
    private val savedState: SavedStateHandle
) : ViewModel() {

    // State — use StateFlow, exposed read-only
    val articles: StateFlow<ArticleState> =
        savedState.getStateFlow("filter", Filter.All)
            .flatMapLatest { filter -> repo.getArticles(filter) }
            .map { ArticleState.Success(it) }
            .catch { emit(ArticleState.Error(it.message)) }
            .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), ArticleState.Loading)

    // One-shot events — use SharedFlow with replay=0
    private val _events = MutableSharedFlow<UiEvent>(extraBufferCapacity = 1)
    val events = _events.asSharedFlow()

    fun onArticleClicked(article: Article) {
        viewModelScope.launch {
            _events.emit(UiEvent.Navigate("/article/${article.id}"))
        }
    }

    fun onBookmarkClicked(article: Article) {
        viewModelScope.launch {
            repo.toggleBookmark(article.id)
            _events.tryEmit(UiEvent.ShowSnackbar("Bookmarked"))
        }
    }
}
🎯

Decision rule: Need current state in the ViewModel readable synchronously? → StateFlow. Need to fire one-shot events (navigate, toast) that shouldn't replay on rotation? → SharedFlow(replay=0). Need to transform a Repository cold Flow? → pipe it through stateIn/shareIn. Need a work queue between coroutines? → Channel.

08 · Reference

Full Comparison

The complete decision matrix — when to reach for each primitive.

Question Flow StateFlow SharedFlow Channel
Cold or hot?ColdHotHotHot
Has initial value?NoRequiredNoNo
Read .value sync?NoYesNoNo
Multiple subscribers?Each gets ownAll sharedAll sharedValues split
Replay for late subs?All (cold start)Last valueConfigurableNone
Equality dedup?NoYes (skips same)NoNo
Use for UI state?Via stateInPrimary choiceNot idealNo
Use for one-shot events?NoNo (replays!)Yes (replay=0)Yes
Use for work queues?NoNoNoPrimary choice
Backpressure?Suspend-basedConflatesConfigurableBuffer + suspend
Cancellable collection?YesYesYesYes
Rich operators?Full libraryVia .asFlow()Via .asSharedFlow()Limited
The mental model

Flow = a function that produces values on demand.
StateFlow = an observable variable — always has a value.
SharedFlow = an observable event bus — configurable history.
Channel = a typed coroutine queue — one consumer per value.