Original file line numberDiff line numberDiff line change
Expand Up@@ -373,6 +373,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)

agg = aggregate

def _agg_for_resample(
self, func=None, *args, engine=None, engine_kwargs=None, **kwargs
):
relabeling = func is None
columns = None
if relabeling:
columns, func = validate_func_kwargs(kwargs)
kwargs = {}

if isinstance(func, str):
if maybe_use_numba(engine) and engine is not None:
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba, and engine is not None
# (if engine is None, the called function will handle the case where
# numba is requested via the global option)
kwargs["engine"] = engine
if engine_kwargs is not None:
kwargs["engine_kwargs"] = engine_kwargs
return getattr(self, func)(*args, **kwargs)

elif isinstance(func, abc.Iterable):
# Catch instances of lists / tuples
# but not the class list / tuple itself.
func = maybe_mangle_lambdas(func)
kwargs["engine"] = engine
kwargs["engine_kwargs"] = engine_kwargs
ret = self._aggregate_multiple_funcs(func, *args, **kwargs)
if relabeling:
# columns is not narrowed by mypy from relabeling flag
assert columns is not None # for mypy
ret.columns = columns
if not self.as_index:
ret = ret.reset_index()
return ret

else:
if maybe_use_numba(engine):
return self._aggregate_with_numba(
func, *args, engine_kwargs=engine_kwargs, **kwargs
)

if self.ngroups == 0:
# e.g. test_evaluate_with_empty_groups without any groups to
# iterate over, we have no output on which to do dtype
# inference. We default to using the existing dtype.
# xref GH#51445
obj = self._obj_with_exclusions
return self.obj._constructor(
[],
name=self.obj.name,
index=self._grouper.result_index,
dtype=obj.dtype,
)
return self._python_agg_general(func, *args, **kwargs)

def _python_agg_general(self, func, *args, **kwargs):
f = lambda x: func(x, *args, **kwargs)

Expand DownExpand Up@@ -1501,6 +1556,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
func = maybe_mangle_lambdas(func)

if maybe_use_numba(engine):
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba
kwargs["engine"] = engine
kwargs["engine_kwargs"] = engine_kwargs

op = GroupByApply(self, func, args=args, kwargs=kwargs)
result = op.agg()
if not is_dict_like(func) and result is not None:
# GH #52849
if not self.as_index and is_list_like(func):
return result.reset_index()
else:
return result
elif relabeling:
# this should be the only (non-raising) case with relabeling
# used reordered index of columns
result = cast(DataFrame, result)
result = result.iloc[:, order]
result = cast(DataFrame, result)
# error: Incompatible types in assignment (expression has type
# "Optional[List[str]]", variable has type
# "Union[Union[Union[ExtensionArray, ndarray[Any, Any]],
# Index, Series], Sequence[Any]]")
result.columns = columns # type: ignore[assignment]

if result is None:
# Remove the kwargs we inserted
# (already stored in engine, engine_kwargs arguments)
if "engine" in kwargs:
del kwargs["engine"]
del kwargs["engine_kwargs"]
# at this point func is not a str, list-like, dict-like,
# or a known callable(e.g. sum)
if maybe_use_numba(engine):
return self._aggregate_with_numba(
func, *args, engine_kwargs=engine_kwargs, **kwargs
)
# grouper specific aggregations
result = self._python_agg_general(func, *args, **kwargs)

if not self.as_index:
result = self._insert_inaxis_grouper(result)
result.index = default_index(len(result))

return result

agg = aggregate

def _agg_for_resample(
self, func=None, *args, engine=None, engine_kwargs=None, **kwargs
):
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
func = maybe_mangle_lambdas(func)

if maybe_use_numba(engine):
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba
Expand DownExpand Up@@ -1577,8 +1687,6 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)

return result

agg = aggregate

def _python_agg_general(self, func, *args, **kwargs):
f = lambda x: func(x, *args, **kwargs)

Expand All@@ -1589,9 +1697,11 @@ def _python_agg_general(self, func, *args, **kwargs):

obj = self._obj_with_exclusions

if not len(obj.columns):
if self._grouper._is_resample and not len(obj.columns):
# e.g. test_margins_no_values_no_cols
return self._python_apply_general(f, self._selected_obj)
return obj._constructor(
index=self._grouper.result_index, columns=obj.columns
)

output: dict[int, ArrayLike] = {}
for idx, (name, ser) in enumerate(obj.items()):
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -589,6 +589,7 @@ def __init__(
self._groupings: list[grouper.Grouping] = list(groupings)
self._sort = sort
self.dropna = dropna
self._is_resample = False

@property
def groupings(self) -> list[grouper.Grouping]:
Expand DownExpand Up@@ -940,12 +941,14 @@ def _aggregate_series_pure_python(

for i, group in enumerate(splitter):
res = func(group)
res = extract_result(res)

if not initialized:
# We only do this validation on the first iteration
check_result_array(res, group.dtype)
initialized = True
if self._is_resample:
res = extract_result(res)

if not initialized:
# We only do this validation on the first iteration
check_result_array(res, group.dtype)
initialized = True

result[i] = res

Expand Down
Loading