Flow,
StateFlow,
SharedFlow
& Channels
A complete deep dive into Kotlin's reactive stream primitives — how they work, how they differ, and exactly when to reach for each one.
Cold vs Hot Streams
Before diving into the types, this distinction is everything. It determines whether your data source activates on demand or runs independently of listeners.
Flow is cold.StateFlow, SharedFlow, and Channel are hot.Mental model: Cold flow = a function call that generates a sequence on demand. Hot flow = a live radio broadcast you tune into. You can tune in anytime, but you only hear what's playing right now (unless the station has a replay buffer).
| Property | Flow (cold) | StateFlow (hot) | SharedFlow (hot) | Channel (hot) |
|---|---|---|---|---|
| Activates on collect? | Yes | No | No | No |
| Multiple collectors | Each gets own stream | Shared, same values | Shared, same events | Values split among collectors |
| Replays past values | All (cold restart) | Last value always | Configurable (0..n) | No replay |
| Has current state | No | Yes — .value | No | No |
| Backpressure handling | Suspend-based | Drops if slow collector | Configurable | Suspend / buffer |
Flow
The foundational reactive primitive. A cold, sequential stream of values — nothing runs until someone collects it, and each collector gets the full sequence from scratch.
A Flow<T> is conceptually a suspended function that emits multiple values. The producer block is defined using the flow { } builder, which gives you access to emit(). The entire producer block executes in a coroutine, on the collector's coroutine context (upstream operators can change this).
// Creating a Flow — the producer block is NOT executed here fun temperatureReadings(): Flow<Double> = flow { while (true) { val temp = thermometer.read() // suspending call emit(temp) // suspend until collector ready delay(1000) } } // Collecting — NOW the producer block runs viewModelScope.launch { temperatureReadings() .map { it * 9/5 + 32 } // Celsius → Fahrenheit .filter { it > 98.6 } // only fevers .collect { temp -> Log.d("Fever", "$temp°F") } } // Two collectors = two independent producer executions val flow = temperatureReadings() launch { flow.collect { /* collector A — own coroutine */ } } launch { flow.collect { /* collector B — independent execution */ } }
Context preservation
By default, a Flow emits in the collector's coroutine context — the producer runs on whatever thread the collect call lives on. Use flowOn(dispatcher) to move the upstream producer to a different context. This is the only correct way to change Flow execution context.
fun searchDatabase(query: String): Flow<List<Result>> = flow { val results = db.search(query) // runs on IO emit(results) }.flowOn(Dispatchers.IO) // ← moves UPSTREAM to IO thread // The collect{} block always runs on the COLLECTOR's context (Main) viewModelScope.launch { // ← this is Main searchDatabase("kotlin") .collect { results -> binding.list.adapter = ResultAdapter(results) // safe: Main } } // ✗ WRONG: do NOT use withContext inside a flow builder val badFlow = flow { withContext(Dispatchers.IO) { // throws IllegalStateException emit(fetchData()) } }
Context preservation rule: A Flow must emit in the same context it was collected in. Calling withContext inside a flow { } builder violates this and throws. Use flowOn on the outside instead.
Exception handling
userFlow() .catch { e -> // catches UPSTREAM exceptions emit(User.empty()) // can emit a fallback value logger.error("Flow error", e) } .onEach { user -> // side effect per emission analytics.track("user_loaded") } .onCompletion { cause -> // always fires — success or error if (cause != null) showError(cause) } .collect { user -> render(user) } // catch does NOT catch exceptions in collect{} — only upstream // For collect exceptions: wrap collect{} in try/catch try { flow.collect { value -> if (value < 0) throw IllegalStateException("negative") } } catch (e: Exception) { handleError(e) }
StateFlow
A hot flow that holds a single current value, always available synchronously. The reactive successor to LiveData — it's the standard for exposing UI state from a ViewModel.
Think of StateFlow as an observable variable. It always has a value (you must provide an initial one). When the value changes, all current collectors receive the new value. Late collectors immediately get the current value upon subscription.
Crucially, StateFlow has conflation: if a collector is slow and multiple values are emitted rapidly, it only receives the latest value — intermediate values are dropped. This makes it ideal for state representation but wrong for events.
class ProfileViewModel : ViewModel() { // Private mutable — only ViewModel can write private val _uiState = MutableStateFlow<ProfileState>(ProfileState.Loading) // Public immutable — UI reads only val uiState: StateFlow<ProfileState> = _uiState.asStateFlow() // Read current value synchronously — no coroutine needed val currentState: ProfileState get() = _uiState.value fun loadProfile(id: String) { viewModelScope.launch { _uiState.update { ProfileState.Loading } // atomic update try { val profile = repo.getProfile(id) _uiState.update { ProfileState.Success(profile) } } catch (e: Exception) { _uiState.update { ProfileState.Error(e.message) } } } } } // In Fragment — lifecycle-safe collection viewLifecycleOwner.lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.uiState.collect { state -> when (state) { is ProfileState.Loading -> showLoading() is ProfileState.Success -> render(state.profile) is ProfileState.Error -> showError(state.message) } } } }
stateIn — converting a cold Flow to StateFlow
stateIn is the most important operator for production use. It converts a cold Flow (e.g., from Room) into a StateFlow with configurable sharing and lifecycle behavior.
val users: StateFlow<List<User>> = repo.getAllUsers() // cold Flow from Room .map { it.filter { u -> u.isActive } } .stateIn( scope = viewModelScope, // WhileSubscribed(5000): keep upstream alive for 5s after last // subscriber — survives rotation (brief unsubscribe ~100ms) started = SharingStarted.WhileSubscribed(5000), initialValue = emptyList() ) // SharingStarted options: // Eagerly — starts immediately, never stops // Lazily — starts on first subscriber, never stops // WhileSubscribed(stopTimeout) — starts/stops with subscribers // └── the 5000ms grace period prevents restart on rotation
Channel
A coroutine primitive for communicating between coroutines. Unlike flows, channels are not multicast — values are consumed by exactly one receiver. Think of it as a typed queue with coroutine-native blocking.
A Channel is essentially a concurrent blocking queue for coroutines. send() puts a value in; receive() takes one out. Both can suspend: send suspends if the buffer is full; receive suspends if the channel is empty. This is the fundamental coroutine mechanism for the producer-consumer pattern.
Channels vs SharedFlow: When two coroutines share a Channel, only one of them receives each value. With SharedFlow, all subscribers see every emission. This makes Channels the right tool for work distribution (task queues, pipelines) and SharedFlow right for observation.
// Basic channel — producer/consumer val channel = Channel<Int>(Channel.BUFFERED) // buffer of 64 // Producer coroutine launch { for (i in 1..100) { channel.send(i) // suspends if buffer full } channel.close() // signal completion } // Consumer coroutine — for-loop until channel closed launch { for (value in channel) { processItem(value) } } // Multiple consumers — values are SPLIT between them (not broadcast) repeat(4) { workerId -> launch { for (task in channel) { Log.d("Worker $workerId processing task $task") } } }
Channel capacity types
produce { } — channel as a Flow
// produce{} — creates a ReceiveChannel with coroutine scope val numbers: ReceiveChannel<Int> = produce { for (i in 1..10) { send(i) delay(100) } } // channelFlow{} — Flow API with Channel semantics // Bridge: you need Flow's operators but Channel's concurrent sends fun mergedSensors(): Flow<SensorData> = channelFlow { launch { sensor1.readings().collect { send(it) } } launch { sensor2.readings().collect { send(it) } } launch { sensor3.readings().collect { send(it) } } // All three sensors feed into one Flow concurrently } // Note: regular flow{} can only call emit() from one coroutine. // channelFlow{} allows concurrent send() from multiple coroutines.
Flow Operators
Operators are the transformation layer between producer and consumer. They form a pipeline where each operator creates a new Flow wrapping the previous one.
flow { }
transform T
predicate
context
terminal
val searchFlow: Flow<String> = searchField.textChanges() searchFlow // Transformation .map { query -> query.trim().lowercase() } .filter { it.length >= 3 } // only non-trivial queries .distinctUntilChanged() // skip identical consecutive // Timing .debounce(300) // wait 300ms of inactivity // Switching — cancel previous search on new query .flatMapLatest { query -> // ← KEY for search! repo.search(query) // cancels previous call } // Error handling .catch { e -> emit(emptyList()) } // Side effects (don't transform) .onEach { results -> analytics.log(results.size) } .onStart { showLoading() } .onCompletion { hideLoading() } // Terminal .collect { results -> adapter.submitList(results) }
The flatMap family — concurrency control
These are the most powerful and most nuanced operators. They all take each emitted value and produce an inner Flow from it — but differ in how they handle concurrent inner Flows:
| Operator | When new value arrives | Use case |
|---|---|---|
| flatMapLatest | Cancels previous inner Flow, starts new one | Search, live queries — only care about latest |
| flatMapConcat | Queues — waits for previous to complete first | Ordered operations, sequential processing |
| flatMapMerge | Runs all inner Flows concurrently | Parallel downloads, fire-and-collect |
| transformLatest | Like flatMapLatest but with arbitrary emit logic | Complex transformations that need cancellation |
Combining Flows
Real apps need multiple data sources coordinated together. These operators merge, zip, and interleave streams.
// combine — emits whenever EITHER flow emits (latest from both) combine(userFlow, settingsFlow) { user, settings -> UiModel(user.name, settings.theme) }.collect { model -> render(model) } // zip — pairs emissions by index — waits for both to emit flowA.zip(flowB) { a, b -> "$a-$b" } // flowA: 1, 2, 3 flowB: a, b, c result: "1-a", "2-b", "3-c" // merge — interleaves N flows (order by arrival time) merge(flow1, flow2, flow3).collect { value -> handle(value) } // Real example: combine user + subscription + features val dashboard: StateFlow<DashboardState> = combine( repo.getUser(), repo.getSubscription(), repo.getFeatureFlags() ) { user, sub, flags -> DashboardState( userName = user.name, isPro = sub.isPro, showNewFeature = flags.newDashboard ) }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), DashboardState.Loading)
Android Patterns
Flows become dangerous if collected outside a lifecycle-aware scope. These patterns keep your app safe and efficient.
The golden rule: repeatOnLifecycle
Never use lifecycleScope.launch { flow.collect { } } alone — this keeps collecting even when the Activity is stopped (in the background), wasting CPU and potentially crashing when touching UI that's been destroyed. Always wrap in repeatOnLifecycle:
// ✗ UNSAFE — keeps collecting in background lifecycleScope.launch { viewModel.state.collect { render(it) } } // ✓ SAFE — pauses when STOPPED, resumes when STARTED lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { // Collect multiple flows concurrently within the same lifecycle launch { viewModel.uiState.collect { render(it) } } launch { viewModel.events.collect { handleEvent(it) } } } } // ✓ COMPOSE — lifecycle-aware by default val state by viewModel.uiState.collectAsStateWithLifecycle() // ↑ stops collection when UI is not visible // USE THIS not .collectAsState()
The complete ViewModel pattern
class ArticleViewModel( private val repo: ArticleRepository, private val savedState: SavedStateHandle ) : ViewModel() { // State — use StateFlow, exposed read-only val articles: StateFlow<ArticleState> = savedState.getStateFlow("filter", Filter.All) .flatMapLatest { filter -> repo.getArticles(filter) } .map { ArticleState.Success(it) } .catch { emit(ArticleState.Error(it.message)) } .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), ArticleState.Loading) // One-shot events — use SharedFlow with replay=0 private val _events = MutableSharedFlow<UiEvent>(extraBufferCapacity = 1) val events = _events.asSharedFlow() fun onArticleClicked(article: Article) { viewModelScope.launch { _events.emit(UiEvent.Navigate("/article/${article.id}")) } } fun onBookmarkClicked(article: Article) { viewModelScope.launch { repo.toggleBookmark(article.id) _events.tryEmit(UiEvent.ShowSnackbar("Bookmarked")) } } }
Decision rule: Need current state in the ViewModel readable synchronously? → StateFlow. Need to fire one-shot events (navigate, toast) that shouldn't replay on rotation? → SharedFlow(replay=0). Need to transform a Repository cold Flow? → pipe it through stateIn/shareIn. Need a work queue between coroutines? → Channel.
Full Comparison
The complete decision matrix — when to reach for each primitive.
| Question | Flow | StateFlow | SharedFlow | Channel |
|---|---|---|---|---|
| Cold or hot? | Cold | Hot | Hot | Hot |
| Has initial value? | No | Required | No | No |
| Read .value sync? | No | Yes | No | No |
| Multiple subscribers? | Each gets own | All shared | All shared | Values split |
| Replay for late subs? | All (cold start) | Last value | Configurable | None |
| Equality dedup? | No | Yes (skips same) | No | No |
| Use for UI state? | Via stateIn | Primary choice | Not ideal | No |
| Use for one-shot events? | No | No (replays!) | Yes (replay=0) | Yes |
| Use for work queues? | No | No | No | Primary choice |
| Backpressure? | Suspend-based | Conflates | Configurable | Buffer + suspend |
| Cancellable collection? | Yes | Yes | Yes | Yes |
| Rich operators? | Full library | Via .asFlow() | Via .asSharedFlow() | Limited |
Flow = a function that produces values on demand.
StateFlow = an observable variable — always has a value.
SharedFlow = an observable event bus — configurable history.
Channel = a typed coroutine queue — one consumer per value.