Aggregation

Today I wanted to look at an approach for producing aggregate data from multiple measurements over a source. I’m learning Kotlin at the moment so I’ll use that for the examples in this post, but we can apply the same idea to pretty much any language (I’ve used similar approaches in F#, and it would work with C# albeit with a bit more code noise). Any feedback on the approach in general and on my Kotlin-ing attempts is appreciated.

Motivating example

For this post we’ll consider the example of a list of Sample values we want aggregate information for. A Sample includes the month and year it was collected, and an integer representing the value sampled. For each set of samples we are required to show the following information:

  • The total value sampled for each month and year
  • The earliest sample date in this data set
  • The largest individual sample collected
  • A count of how many samples where within a specific range.
data class MonthYear(val year: Int, val month: Int) : Comparable<MonthYear> { /* ... */ }
data class Sample(val date: MonthYear, val value: Int)

Initial attempts

We could neatly get each individual bit of information by using multiple queries1, but requiring multiple iterations seems quite wasteful, especially for larger data sets. Instead we could use multiple variables, or an aggregate type containing those variables, and update each as we loop or fold over the data set:

data class CandidateAggregate(var data: Map<MonthYear, Int>,
                              var earliestSampleDate: MonthYear?,
                              var largestSample: Int,
                              var inRangeCount: Int)

val result = samples.fold(
        CandidateAggregate(emptyMap(), null, 0, 0), // empty case
        { acc, s ->
            CandidateAggregate(
                acc.data.insertOrUpdate(s.date) { if (it==null) s.value else it + s.value },
                minOf(acc.earliestSampleDate ?: s.date, s.date),
                maxOf(acc.largestSample, s.value),
                acc.inRangeCount + if ((100..200).contains(s.value)) 1 else 0
            )
        })

/** Helper for updating the value for a key in a map, or inserting it if it does not exist. */
private fun <K, V> Map<K, V>.insertOrUpdate(key: K, transform: (V?) -> V): Map<K, V> =
        plus(key to transform(get(key)))

This seems a reasonable approach to me, and we’ll take this and adapt it in an attempt to get a few additional benefits:

  • Include more information about the type of calculation used for each field in the aggregate
  • Enable reuse of specific calculations in other aggregates
  • Enable independent testing of each calculation type
  • Make it fairly simple to change existing aggregates, and to create new ones.

Representing aggregate calculations

First we’ll create a type to represent values that can be combined. We’ll use Kotlin’s plus operator for this purpose.

/** A type [T] with an associative binary operation. Must satisfy the associative property:  `a + (b + c) == (a + b) + c` */
interface Semigroup<T> {
    operator fun plus(other: T): T
}

We’ll steal the term "semigroup" from mathematics as its definition includes the constraints our plus operation needs2, although we could also call it Combinable or Addable or something else if we prefer.

If you haven’t used Kotlin before, defining a plus operator function lets us also use the + symbol, so a + b will get translated to a.plus(b). Whenever you see two semigroups being added using + for the remainder of this post, keep in mind it will be calling the plus function defined by that semigroup instance. (If you don’t like co-opting + in this way feel free to change the interface to declare fun combine(other: T): T) or similar.)

Next, we’ll define instances that represent sum, max, and min aggregation:

data class Sum(val value: Int) : Semigroup<Sum> {
    override fun plus(other: Sum): Sum = Sum(value + other.value)
}

data class Max<T : Comparable<T>>(val value: T) : Semigroup<Max<T>> {
    override operator fun plus(other: Max<T>) = Max(maxOf(value, other.value))
}

data class Min<T : Comparable<T>>(val value: T) : Semigroup<Min<T>> {
    override operator fun plus(other: Min<T>) = Min(minOf(value, other.value))
}

Looking at our CandidateAggregate from earlier, we also need to handle nullable values (earliestSampleDate: MonthYear?), as well as combining Map<MonthYear, Int> values. Rather than building these specifically for this case, we can express these concepts more generally in terms of other semigroups, so they can be reused for different cases:

/**
 * Combine nullable values. Use the semigroup instance to combine if both have values, or if only
 * one value is present use that.
 */
data class Nullable<T : Semigroup<T>>(val value: T?) : Semigroup<Nullable<T>> {
    override fun plus(other: Nullable<T>): Nullable<T> =
            if (value != null && other.value != null) {
                Nullable(value + other.value) // Reminder: `+` here will call T.plus defined for the Semigroup<T>.
            } else {
                Nullable(this.value ?: other.value)
            }
}

/**
 * Merge [Map]s where the values have a semigroup instance. If both maps have an entry for the same key, these
 * will be combined using the semigroup operation.
 */
data class Mapped<K, V : Semigroup<V>>(val value: Map<K, V>) : Semigroup<Mapped<K, V>>, Map<K, V> by value {
    override fun plus(other: Mapped<K, V>): Mapped<K, V> = Mapped(
            entries.fold(other) { acc, entry ->
                Mapped(acc.insertOrUpdate(entry.key, { if (it != null) it + entry.value else entry.value }))
            })
}

Each of these operations is implemented quite similarly to the code we used for each field in CandidateAggregate, but now we can reuse them for different aggregates, as well as test each in isolation. The cost is we have now spread this code across more types.

We can also write some general functions, concat and concatMap, to combine any list of Semigroup<T> values into a single Semigroup<T> value, effectively combining aggregates3. Here is an example of how to define and use these functions (as well as an example of testing Sum and Max in isolation):

/** Reduce a list of `T` to a single `T` using a semigroup operation */
fun <T : Semigroup<T>> concat(empty: T, items: Iterable<T>) = items.fold(empty, { acc, t -> t + acc })

/** Reduce a list of [A] by converting each item to a [T] with a semigroup instance, then combining to a single value using [concat]. */
fun <T : Semigroup<T>, A> concatMap(empty: T, items: Iterable<A>, f: (A) -> T) =
        concat(empty, items.map(f))

@Test
fun examples() {
    val list = listOf(42, 123, 19, 73)
    assertEquals(Sum(257), Semigroup.concatMap(Sum(0), list) { Sum(it) })
    assertEquals(Max(123), Semigroup.concatMap(Max(0), list) { Max(it) })
}

Using our aggregation types

Now we can rewrite CandidateAggregate using our aggregation types:

data class Aggregate(var data: Mapped<MonthYear, Sum>,
                     var earliestSampleDate: Nullable<Min<MonthYear>>,
                     var largestSample: Max<Int>,
                     var inRange: Sum) : Semigroup<Aggregate> {
    companion object {
        val empty = Aggregate(Mapped(emptyMap()), Nullable(null), Max(0), Sum(0))
    }

    override fun plus(other: Aggregate): Aggregate =
            Aggregate(data + other.data,
                    earliestSampleDate + other.earliestSampleDate,
                    largestSample + other.largestSample,
                    inRange + other.inRange)
}

The type of aggregation used appears explicitly for each field in Aggregate. For example largestSample: Max<Int> conveys both the type of the result (Int), as well as the process being used to calculated it (Max). In CandidateAggregate only the former was expressed. We also build some field types by composing semigroups, such as Mapped<MonthYear, Sum>, which specifies we will be adding values using Sum rather than some other approach. This also makes it very simple to update the method of aggregation (as illustrated below).

We have made Aggregate itself a semigroup to define how we combine these composite aggregates. We’ve also added an empty property to make it easier to call concat and concatMap.

The last piece we need is to translate a single Sample into an Aggregate, then we can do the entire aggregation using concatMap as shown in the aggregateSamples() test. Each Sample gets transformed into an Aggregate representing that individual sample (an aggregate of 1), then each Aggregate in turn gets combined to calculate the required information across all the samples.

fun aggregateSample(sample: Sample): Aggregate =
        Aggregate(Mapped(mapOf(sample.date to Sum(sample.value))),
                Nullable(Min(sample.date)),
                Max(sample.value),
                sample.value.countWithin(100..200))

fun <T : Comparable<T>> T.countWithin(range: ClosedRange<T>) =
        Sum(if (range.contains(this)) 1 else 0)

@Test
fun aggregateSamples() {
    // Aggregation
    val result = Semigroup.concatMap(Aggregate.empty, samples) { aggregateSample(it) }

    // Actual results are equivalent to the individual queries on the left:
    assertEquals(samples.minBy { it.date }?.date, result.earliestSampleDate.value?.value)
    assertEquals(samples.maxBy { it.value }?.value, result.largestSample.value)
    assertEquals(samples.count { (100..200).contains(it.value) }, result.inRange.value)
    val june2017 = MonthYear(2017, 6)
    assertEquals(samples.filter { x -> june2017 == x.date }.sumBy { it.value }, result.data[june2017]?.value)
}

What have we gained for the price?

This definitely has more pieces that the CandidateAggregate version (although the code for each piece has not changed much, it is now spread over multiple types). More pieces suggest a performance impact, but I have not measured this.

We do get a few benefits for this price. Firstly, we now have some small, simple, genuinely reusable aggregation types (Sum, Max, Min, Mapped etc.). These can be combined into other aggregates, and they can be tested in isolation. Secondly, we explicitly define aggregate types in terms of the aggregates of which they are composed. We don’t have an aggregate that contains an Int, we have a Sum or a Max<Int> which conveys more information as to the aggregation process, as well as preventing errors (summing two Int values that should have been combined using maxOf for example).

We also make it simpler to change our aggregation. For example, if we wanted to change from reporting the total value to the maximum value for each month, we can change Mapped<MonthYear, Sum> to Mapped<MonthYear, Max<Int>> and the aggregation process will adjust accordingly.

Conclusion

We introduced a Semigroup<T> interface which represents values that can be combined with an associative, binary operation. We also introduced concat and concatMap operations that work for any instance of this interface. We created Sum, Max, Min, Nullable and Mapped instances of this interface to represent common methods of aggregation, then built a custom Aggregate semigroup composed of some of these instances.

This is a bit more complex compared than manually aggregating a set of values over a loop or fold, but in return gives us reusable and testable aggregate types, more communicative types for our aggregate model, less opportunities for bugs in the aggregation process, as well as making the creation of new aggregates and modifications to existing aggregates simpler.

Suggested reading


  1. Example of multiple queries:

    val minDate = samples.map { it.date }.min()
    val maxSample = samples.map { it.value }.max()
    val inRangeCount = samples.count { (100..200).contains(it.value) }
  2. A semigroup for a type T consists of a closed binary operation T -> T -> T that is also associative (i.e. a + (b + c) == (a + b) + c). This associativity constraint means we can combine and compose these values fairly flexibly. For example, we can do a + b + c, without having to worry about wether b is itself a composite of x + y, as associativity guarantees a + (x + y) + c is the same as ((a + x) + y) + c. We can’t do the same thing with non-associative operations like subtraction:

    100 - (30 - 10) - 5 /= ((100 - 30) - 10) - 5
    75 /= 55

    The end result is we can use associativity to combine values without having to also take evaluation order into account.

  3. Both concat and concatMap take an empty: T value for cases where the items lists are empty. We could use a Monoid constraint instead of Semigroup, which adds the concept of an empty identity element, but I found this messy to implement in Kotlin.

Comments