Created 2024/12/01 at 09:25PM
Last Modified 2025/01/09 at 09:11PM
A few years back, I was working with some very large datasets in a codebase. As time passed and I was getting more and more data for processing, the time taken for analytics as well as memory consumption was blowing up. After some profiling, I found one of the lines in the code would perform an operation like
z = df.groupby("ID").agg({"VAL1": ",".join, "VAL2": ",".join}).reset_index()
And this would cause a ~30 GB spike in memory usage (without this spike, average memory usage was around 20 GB) leading to total memory usage of ~50 GB and ~50% of the processing time was spend here.
Analyzing your data can give you great insights which can help you choose the right path.
In this post, I'll share a custom approach to perform fast grouping and aggregation. This method shines when the number of unique values in the columns to be aggregated is small (e.g., ≤ 10), achieving significant performance improvements over conventional approaches. We’ll also dive into how this technique aligns with concepts like one-hot encoding and cross-tabulation while avoiding the inefficiencies of full matrix operations.
The above df
in question had similar properties where unique counts for VAL1
and VAL2
were 7 and 13. So we can use this approach to optimize the process and see how it performs.
def better_groupby(df, group_col, agg_cols):
g_labels, g_uniques = pd.factorize(df[group_col])
res = None
for col in agg_cols:
labels, uniques = pd.factorize(df[col].fillna(value=''))
mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
np.add.at(mat, (g_labels, labels), 1)
df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
for c in df_.columns:
df_[c] = np.where(df_[c] >= 1, c, '')
df_[col] = ''
df_[col] = df_.agg(",".join, axis=1).str.strip(",")
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
df_ = df_.rename_axis(group_col).reset_index()
if res is None:
res = df_
else:
res = pd.merge(res, df_, on=group_col)
return res[[group_col] + agg_cols]
Let's see line by line functioning
>>> df
tid val1
0 1 a
1 1 b
2 2 c
3 3 d
4 3 e
5 3 f
>>> better_groupby(df, "tid", ["val1"])
tid val1
0 1 a,b
1 2 c
2 3 d,e,f
g_labels, g_uniques = pd.factorize(df[group_col])
- This gives labels for each unique value of df[group_col]
, and unique values of d[group_col]
. In our case, g_labels
will be array([0, 0, 1, 2, 2, 2])
and g_uniques
will be Index(['1', '2', '3'], dtype='object')
labels, uniques = pd.factorize(df[col].fillna(value=""))
- This gives lables and uniques for column to be aggregated. In our case labels
will be array([0, 1, 2, 3, 4, 5])
and uniques
would be Index(['a', 'b', 'c', 'd', 'e', 'f'], dtype='object')
The creation of the matrix with np.zeros and the subsequent assignment of labels (np.add.at(mat, (i, labels), 1)) mimics a form of one-hot encoding, where each unique value of val1
gets assigned an index.
>>> mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
>>> np.add.at(mat, (g_labels, labels), 1)
>>> mat
array([
[1, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 1, 1]
])
val1
and rows represent tid
. Let's take a look at next step.>>> df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
a b c d e f
1 1 1 0 0 0 0
2 0 0 1 0 0 0
3 0 0 0 1 1 1
1
s with the column name and 0
s with empty string, and concatenate all columns across each row into a single string value separated by ,
.>>> for c in df_.columns:
df_[c] = np.where(df_[c] >= 1, c, "")
>>> df_
a b c d e f
1 a b
2 c
3 d e f
>>> df_[col] = ""
df_[col] = df_.agg(','.join, axis=1).str.strip(',')
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
>>> df_
val1
1 a,b
2 c
3 d,e,f
Great! Now how does it perform in terms of performance?
I've written some utility functions. I've also generated a sample dataframe and stored it in a csv, it has 1 million rows.
# utils.py
from contextlib import contextmanager
import os
import psutil
import time
@contextmanager
def memory_usage():
process = psutil.Process(os.getpid())
memory_before = process.memory_info().rss / (1024 ** 2)
print(f"Memory usage before: {memory_before:.2f} MB")
try:
yield
finally:
memory_after = process.memory_info().rss / (1024 ** 2)
print(f"Memory usage after: {memory_after:.2f} MB")
print(f"Memory used: {memory_after - memory_before:.2f} MB")
@contextmanager
def timer():
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
print(f"Time taken: {end - start} seconds")
# normal.py
import pandas as pd
from utils import *
def group(df, group_col, agg_cols):
return df.groupby(group_col).agg({col: ','.join for col in agg_cols}).reset_index()
if __name__ == "__main__":
df = pd.read_csv("./data.csv")
with memory_usage(), timer():
_ = group(df, "tid", ["val1", "val2"])
# optimized.py
import numpy as np
import pandas as pd
from utils import *
def better_groupby(df, group_col, agg_cols):
g_labels, g_uniques = pd.factorize(df[group_col])
res = None
for col in agg_cols:
labels, uniques = pd.factorize(df[col].fillna(value=""))
mat = np.zeros((len(g_uniques), len(uniques)), dtype=np.int64)
np.add.at(mat, (g_labels, labels), 1)
df_ = pd.DataFrame(mat, index=g_uniques, columns=uniques)
for c in df_.columns:
df_[c] = np.where(df_[c] >= 1, c, "")
df_[col] = ""
df_[col] = df_.agg(','.join, axis=1).str.strip(',')
df_.drop(columns=[c for c in df_.columns if c != col], inplace=True)
df_ = df_.rename_axis(group_col).reset_index()
if res is None:
res = df_
else:
res = pd.merge(res, df_, on=group_col)
return res[[group_col] + agg_cols]
if __name__ == "__main__":
df = pd.read_csv("./data.csv")
with memory_usage(), timer():
_ = better_groupby(df, "tid", ["val1", "val2"])
$ python normal.py
Memory usage before: 620.79 MB
Time taken: 4.337185889999091 seconds
Memory usage after: 1422.88 MB
Memory used: 802.09 MB
$ python optimized.py
Memory usage before: 620.86 MB
Time taken: 3.781618000997696 seconds
Memory usage after: 735.93 MB
Memory used: 115.07 MB
So, we see an improvement in - runtime from 4.34 seconds > 3.78 seconds - memory consumption from 802.09 MB > 115.07 MB