Created 2024/12/01 at 09:25PM

Last Modified 2025/01/09 at 09:11PM

A few years back, I was working with some very large datasets in a codebase. As time passed and I was getting more and more data for processing, the time taken for analytics as well as memory consumption was blowing up. After some profiling, I found one of the lines in the code would perform an operation like

z = df.groupby("ID").agg({"VAL1": ",".join, "VAL2": ",".join}).reset_index()

And this would cause a ~30 GB spike in memory usage (without this spike, average memory usage was around 20 GB) leading to total memory usage of ~50 GB and ~50% of the processing time was spend here.

Analyzing your data can give you great insights which can help you choose the right path.

In this post, I'll share a custom approach to perform fast grouping and aggregation. This method shines when the number of unique values in the columns to be aggregated is small (e.g., ≤ 10), achieving significant performance improvements over conventional approaches. We’ll also dive into how this technique aligns with concepts like one-hot encoding and cross-tabulation while avoiding the inefficiencies of full matrix operations.

The above df in question had similar properties where unique counts for VAL1 and VAL2 were 7 and 13. So we can use this approach to optimize the process and see how it performs.

The Approach

def better_groupby(df, group_col, agg_cols):
    g_labels, g_uniques = pd.factorize(df[group_col])
    res = None
    for col in agg_cols:
        labels, uniques = pd.factorize(df[col].fillna(value=''))
        mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
        np.add.at(mat, (g_labels, labels), 1)
        df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
        for c in df_.columns:
            df_[c] = np.where(df_[c] >= 1, c, '')
        df_[col] = ''
        df_[col] = df_.agg(",".join, axis=1).str.strip(",")
        df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
        df_ = df_.rename_axis(group_col).reset_index()
        if res is None:
            res = df_
        else:
            res = pd.merge(res, df_, on=group_col)
    return res[[group_col] + agg_cols]

Let's see line by line functioning

>>> df
  tid val1
0   1    a
1   1    b
2   2    c
3   3    d
4   3    e
5   3    f
>>> better_groupby(df, "tid", ["val1"])
  tid   val1
0   1    a,b
1   2      c
2   3  d,e,f
>>> mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
>>> np.add.at(mat, (g_labels, labels), 1)
>>> mat
array([
        [1, 1, 0, 0, 0, 0],
        [0, 0, 1, 0, 0, 0],
        [0, 0, 0, 1, 1, 1]
])
>>> df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
       a  b  c  d  e  f
1  1  1  0  0  0  0
2  0  0  1  0  0  0
3  0  0  0  1  1  1
>>> for c in df_.columns:
                df_[c] = np.where(df_[c] >= 1, c, "")
>>> df_
       a  b  c  d  e  f
1  a  b            
2        c         
3           d  e  f
>>> df_[col] = ""
df_[col] = df_.agg(','.join, axis=1).str.strip(',')
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
>>> df_
        val1
1    a,b
2      c
3  d,e,f

Great! Now how does it perform in terms of performance?

Performance Comparison

I've written some utility functions. I've also generated a sample dataframe and stored it in a csv, it has 1 million rows.

# utils.py
from contextlib import contextmanager

import os
import psutil
import time


@contextmanager
def memory_usage():
    process = psutil.Process(os.getpid())
    memory_before = process.memory_info().rss / (1024 ** 2)
    print(f"Memory usage before: {memory_before:.2f} MB")
    try:
        yield
    finally:
        memory_after = process.memory_info().rss / (1024 ** 2)
        print(f"Memory usage after: {memory_after:.2f} MB")
        print(f"Memory used: {memory_after - memory_before:.2f} MB")


@contextmanager
def timer():
    start = time.perf_counter()
    try:
        yield
    finally:
        end = time.perf_counter()
        print(f"Time taken: {end - start} seconds")
# normal.py
import pandas as pd

from utils import *

def group(df, group_col, agg_cols):
    return df.groupby(group_col).agg({col: ','.join for col in agg_cols}).reset_index()


if __name__ == "__main__":
    df = pd.read_csv("./data.csv")
    with memory_usage(), timer():
        _ = group(df, "tid", ["val1", "val2"])
# optimized.py
import numpy as np
import pandas as pd

from utils import *


def better_groupby(df, group_col, agg_cols):
    g_labels, g_uniques = pd.factorize(df[group_col])
    res = None
    for col in agg_cols:
        labels, uniques = pd.factorize(df[col].fillna(value=""))
        mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
        np.add.at(mat, (g_labels, labels), 1)
        df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
        for c in df_.columns:
            df_[c] = np.where(df_[c] >= 1, c, "")
        df_[col] = ""
        df_[col] = df_.agg(','.join, axis=1).str.strip(',')
        df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
        df_ = df_.rename_axis(group_col).reset_index()
        if res is None:
            res = df_
        else:
            res = pd.merge(res, df_, on=group_col)

    return res[[group_col] + agg_cols]


if __name__ == "__main__":
    df = pd.read_csv("./data.csv")
    with memory_usage(), timer():
        _ = better_groupby(df, "tid", ["val1", "val2"])
$ python normal.py
Memory usage before: 620.79 MB
Time taken: 4.337185889999091 seconds
Memory usage after: 1422.88 MB
Memory used: 802.09 MB

$ python optimized.py
Memory usage before: 620.86 MB
Time taken: 3.781618000997696 seconds
Memory usage after: 735.93 MB
Memory used: 115.07 MB

So, we see an improvement in - runtime from 4.34 seconds > 3.78 seconds - memory consumption from 802.09 MB > 115.07 MB