The power of ReactiveSwift lies within its operators!

TL;DR

  • SignalProducer lets you model asynchronous units of work that can be started and composed.
  • flatten turns a producer-of-producers into a single stream of values.
  • Pick a strategy based on behavior: latest (keep newest), concat (run serially), merge (merge concurrent work).

Why the flatten operator matters

The power of ReactiveSwift lies in its operators. In this post you will:

  1. See a callback-based solution for running an asynchronous task several times.
  2. Replace it with a SignalProducer-based solution.
  3. Learn how the flatten operator and its strategies latest, concat, and merge change the behavior of your pipelines.

All examples use ReactiveSwift and Result.

import Result
import ReactiveSwift
import Foundation

The problem: run an async task N times, in order

Imagine your product manager gives you a function that does some work asynchronously:

func superSecretFunc(_ text: String, completion: @escaping () -> Void) {
    DispatchQueue.global(qos: .userInitiated).async {
        let diceRoll = Int(arc4random_uniform(99) + 1)
        usleep(UInt32(diceRoll))
        print("Ran \(text) on thread \(Thread.current) for \(diceRoll) milliseconds")
        completion()
    }
}

The requirement: run superSecretFunc several times, one after another, in order.

Naive callback solution

The straightforward way is to nest callbacks:

class CrappySolution1 {

    public func runTasks() {
        print("Solution 1 ...")
        superSecretFunc("1") {
            superSecretFunc("2") {
                superSecretFunc("3") { }
            }
        }

        Thread.sleep(forTimeInterval: 0.3)
    }
}

let so1 = CrappySolution1()
so1.runTasks()

This works, but even with three calls the nesting is already ugly:

Solution 1 ...
Ran 1 on thread <NSThread ...> for 64 milliseconds
Ran 2 on thread <NSThread ...> for 97 milliseconds
Ran 3 on thread <NSThread ...> for 73 milliseconds

Scaling the naive approach

What happens when the PM asks you to run it five times?

class CrappySolution2 {

    public func runTasks() {
        print("Solution 2 ...")
        superSecretFunc("1") {
            superSecretFunc("2") {
                superSecretFunc("3") {
                    superSecretFunc("4") {
                        superSecretFunc("5") {
                        }
                    }
                }
            }
        }
    }
}

let so2 = CrappySolution2()
so2.runTasks()

The code still works, but the indentation and nesting keep growing. If the requirement changes to 7, 9 or 100 times, the callback approach quickly becomes unmaintainable.

That is where ReactiveSwift comes to the rescue.

A better approach with SignalProducer

In ReactiveSwift, a SignalProducer<Value, Error> represents a description of work that can be started later. Each time you start it, it can:

  • Send one or more value events.
  • Eventually send completed or failed.
  • Be composed with other producers using operators.

We can model each call to superSecretFunc as a SignalProducer<Void, NoError>:

class ReactiveSolution1 {

    public func runTasks(times: UInt) {
        print("Reactive Solution 1 ...")
        var producers = [SignalProducer<Void, NoError>]()

        for i in 1...times {
            let sp = SignalProducer<Void, NoError> { observer, _ in
                superSecretFunc(String(i)) {
                    observer.send(value: ())
                    observer.sendCompleted()
                }
            }
            producers.append(sp)
        }

        let fsp = SignalProducer<SignalProducer<Void, NoError>, NoError>(producers)

        fsp.flatten(.concat)
            .on(completed: {
                print("Done!")
            })
            .start()
    }
}

let rso1 = ReactiveSolution1()
rso1.runTasks(times: 7)

Now you can easily change times to 100 without changing any control flow code. The key operator here is flatten.

The flatten operator

flatten is used when you have a producer of producers:

SignalProducer<SignalProducer<Value, Error>, Error>

It turns this into a single producer:

SignalProducer<Value, Error>

The behavior is controlled by the FlattenStrategy you pass to flatten:

  1. latest
  2. concat
  3. merge

Each strategy answers the question: when several inner producers are involved, which ones are allowed to send values, and when does the whole chain complete?

latest

  • Subscribes to inner producers as they appear.
  • Only the most recently started inner producer is allowed to send values.
  • When a new inner producer starts, previous ones are interrupted.
  • Useful for things like “search as you type”: only the latest request matters.

concat

  • Queues inner producers and starts them one after another.
  • The next producer does not start until the current one completes.
  • The outer producer completes when all inner producers have completed.
  • Perfect when you want asynchronous work to run strictly in sequence.

You can explore a marble diagram for concat here:
Visualization

merge

  • Starts all inner producers as they arrive.
  • Values from all inner producers are interleaved into a single stream.
  • Completes when the outer producer and all inner producers complete.
  • Good when you care about every result, regardless of order.

There is also a marble diagram for merge:
Visualization

Seeing latest, concat, and merge in action

The following example uses three simple producers and then flattens them in different ways:

let s1 = SignalProducer<String, NoError> { observer, _ in
    observer.send(value: "1")
//    observer.sendCompleted()
}
let s2 = SignalProducer<String, NoError> { observer, _ in
    observer.send(value: "2")
//    observer.sendCompleted()
}
let s3 = SignalProducer<String, NoError> { observer, _ in
    observer.send(value: "3")
    observer.sendCompleted()
}

let s5 = SignalProducer<SignalProducer<String, NoError>, NoError>([s1, s2, s3])

print("\n ---> latest")
s5.flatten(.latest)
    .on(completed: {
        print("latest completed")
    },
        value: { value in
            print(value)
    }).start()

print("\n ---> concat")
print("concat never completes because it is waiting for all inputs (producers) to complete")
s5.flatten(.concat)
    .on(completed: {
        // will never complete
        print("concat completed")
    },
        value: { value in
            print(value)
    }).start()

print("\n ---> merge")
print("like concat, merge never completes because it is waiting for all inputs (producers) to complete")
print("unlike concat, merge will send all the values from each input")
s5.flatten(.merge)
    .on(completed: {
        // will never complete
        print("merge completed")
    },
        value: { value in
            print(value)
    }).start()

Sample output might look like this:

 ---> latest
1
2
3
latest completed

 ---> concat
concat never completes because it is waiting for all inputs (producers) to complete
1

 ---> merge
like concat, merge never completes because it is waiting for all inputs (producers) to complete
unlike concat, merge will send all the values from each input
1
2
3

Summary

  • Callback-based code quickly becomes unreadable when you need to compose many asynchronous steps.
  • SignalProducer lets you model “a unit of work” that can be started and composed.
  • flatten turns a producer-of-producers into a single stream of values.
  • Different strategies (latest, concat, merge) give you different behavior:
    • latest → keep only the most recent work.
    • concat → run work serially, one after another.
    • merge → run work concurrently and merge all results.

You can find the original source code here:
source code

Welcome to The infinite monkey theorem

Somewhere a monkey just typed Shakespeare in TypeScript. Be the first to read the masterpieces (and the hilarious misfires) landing on the blog.

Subscribe to The infinite monkey theorem

We fling fresh posts—no banana peels attached—straight to your inbox.