File tree

11 files changed

+290
-282
lines changed

11 files changed

+290
-282
lines changed
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,8 @@ def column_ids(self) -> typing.Sequence[str]:
211211
return tuple(self._column_names.keys())
212212

213213
@property
214-
def hidden_ordering_columns(self) -> typing.Tuple[ibis_types.Value, ...]:
215-
return self._hidden_ordering_columns
214+
def _hidden_column_ids(self) -> typing.Sequence[str]:
215+
return tuple(self._hidden_ordering_column_names.keys())
216216

217217
@property
218218
def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]:
@@ -400,24 +400,23 @@ def _hide_column(self, column_id) -> ArrayValue:
400400
expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name})
401401
return expr_builder.build()
402402

403-
def promote_offsets(self) -> typing.Tuple[ArrayValue, str]:
403+
def promote_offsets(self, col_id: str) -> ArrayValue:
404404
"""
405405
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
406406
"""
407407
# Special case: offsets already exist
408408
ordering = self._ordering
409409

410410
if (not ordering.is_sequential) or (not ordering.total_order_col):
411-
return self._project_offsets().promote_offsets()
412-
col_id = bigframes.core.guid.generate_guid()
411+
return self._project_offsets().promote_offsets(col_id)
413412
expr_builder = self.builder()
414413
expr_builder.columns = [
415414
self._get_any_column(ordering.total_order_col.column_id).name(col_id),
416415
*self.columns,
417416
]
418-
return expr_builder.build(), col_id
417+
return expr_builder.build()
419418

420-
def select_columns(self, column_ids: typing.Sequence[str]):
419+
def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
421420
return self._projection(
422421
[self._get_ibis_column(col_id) for col_id in column_ids]
423422
)
@@ -807,7 +806,7 @@ def _create_order_columns(
807806
elif ordering_mode == "string_encoded":
808807
return (self._create_string_ordering_column().name(order_col_name),)
809808
elif expose_hidden_cols:
810-
return self.hidden_ordering_columns
809+
return self._hidden_ordering_columns
811810
return ()
812811

813812
def _create_offset_column(self) -> ibis_types.IntegerColumn:
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ def equals(block1: blocks.Block, block2: blocks.Block) -> bool:
4040

4141
equality_ids = []
4242
for lcol, rcol in zip(block1.value_columns, block2.value_columns):
43-
lcolmapped = lmap(lcol)
44-
rcolmapped = rmap(rcol)
43+
lcolmapped = lmap[lcol]
44+
rcolmapped = rmap[rcol]
4545
joined_block, result_id = joined_block.apply_binary_op(
4646
lcolmapped, rcolmapped, ops.eq_nulls_match_op
4747
)
@@ -563,8 +563,8 @@ def align_rows(
563563
joined_index, (get_column_left, get_column_right) = left_block.index.join(
564564
right_block.index, how=join
565565
)
566-
left_columns = [get_column_left(col) for col in left_block.value_columns]
567-
right_columns = [get_column_right(col) for col in right_block.value_columns]
566+
left_columns = [get_column_left[col] for col in left_block.value_columns]
567+
right_columns = [get_column_right[col] for col in right_block.value_columns]
568568

569569
left_block = joined_index._block.select_columns(left_columns)
570570
right_block = joined_index._block.select_columns(right_columns)
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import bigframes.core.guid as guid
4040
import bigframes.core.indexes as indexes
4141
import bigframes.core.joins as joins
42+
import bigframes.core.joins.name_resolution as join_names
4243
import bigframes.core.ordering as ordering
4344
import bigframes.core.utils
4445
import bigframes.core.utils as utils
@@ -97,7 +98,8 @@ def __init__(
9798
"'index_columns' and 'index_labels' must have equal length"
9899
)
99100
if len(index_columns) == 0:
100-
expr, new_index_col_id = expr.promote_offsets()
101+
new_index_col_id = guid.generate_guid()
102+
expr = expr.promote_offsets(new_index_col_id)
101103
index_columns = [new_index_col_id]
102104
self._index_columns = tuple(index_columns)
103105
# Index labels don't need complicated hierarchical access so can store as tuple
@@ -260,7 +262,8 @@ def reset_index(self, drop: bool = True) -> Block:
260262
from Index classes that point to this block.
261263
"""
262264
block = self
263-
expr, new_index_col_id = self._expr.promote_offsets()
265+
new_index_col_id = guid.generate_guid()
266+
expr = self._expr.promote_offsets(new_index_col_id)
264267
if drop:
265268
# Even though the index might be part of the ordering, keep that
266269
# ordering expression as reset_index shouldn't change the row
@@ -833,7 +836,8 @@ def aggregate_all_and_stack(
833836
else: # axis_n == 1
834837
# using offsets as identity to group on.
835838
# TODO: Allow to promote identity/total_order columns instead for better perf
836-
expr_with_offsets, offset_col = self.expr.promote_offsets()
839+
offset_col = guid.generate_guid()
840+
expr_with_offsets = self.expr.promote_offsets(offset_col)
837841
stacked_expr = expr_with_offsets.unpivot(
838842
row_labels=self.column_labels.to_list(),
839843
index_col_ids=[guid.generate_guid()],
@@ -952,9 +956,10 @@ def aggregate(
952956
]
953957
by_column_labels = self._get_labels_for_columns(by_value_columns)
954958
labels = (*by_column_labels, *aggregate_labels)
955-
result_expr_pruned, offsets_id = result_expr.select_columns(
959+
offsets_id = guid.generate_guid()
960+
result_expr_pruned = result_expr.select_columns(
956961
[*by_value_columns, *output_col_ids]
957-
).promote_offsets()
962+
).promote_offsets(offsets_id)
958963

959964
return (
960965
Block(
@@ -975,7 +980,8 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):
975980

976981
aggregations = [(column_id, stat, stat.name) for stat in stats_to_fetch]
977982
expr = self.expr.aggregate(aggregations)
978-
expr, offset_index_id = expr.promote_offsets()
983+
offset_index_id = guid.generate_guid()
984+
expr = expr.promote_offsets(offset_index_id)
979985
block = Block(
980986
expr,
981987
index_columns=[offset_index_id],
@@ -999,7 +1005,8 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str):
9991005
)
10001006
]
10011007
expr = self.expr.corr_aggregate(corr_aggregations)
1002-
expr, offset_index_id = expr.promote_offsets()
1008+
offset_index_id = guid.generate_guid()
1009+
expr = expr.promote_offsets(offset_index_id)
10031010
block = Block(
10041011
expr,
10051012
index_columns=[offset_index_id],
@@ -1197,7 +1204,8 @@ def retrieve_repr_request_results(
11971204
return formatted_df, count, query_job
11981205

11991206
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
1200-
expr, result_id = self._expr.promote_offsets()
1207+
result_id = guid.generate_guid()
1208+
expr = self._expr.promote_offsets(result_id)
12011209
return (
12021210
Block(
12031211
expr,
@@ -1471,67 +1479,76 @@ def merge(
14711479
"outer",
14721480
"right",
14731481
],
1474-
left_col_ids: typing.Sequence[str],
1475-
right_col_ids: typing.Sequence[str],
1482+
left_join_ids: typing.Sequence[str],
1483+
right_join_ids: typing.Sequence[str],
14761484
sort: bool,
14771485
suffixes: tuple[str, str] = ("_x", "_y"),
14781486
) -> Block:
1479-
(
1480-
joined_expr,
1481-
coalesced_join_cols,
1482-
(get_column_left, get_column_right),
1483-
) = joins.join_by_column(
1487+
joined_expr = joins.join_by_column(
14841488
self.expr,
1485-
left_col_ids,
1489+
left_join_ids,
14861490
other.expr,
1487-
right_col_ids,
1491+
right_join_ids,
14881492
how=how,
1489-
sort=sort,
14901493
)
1494+
get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER(
1495+
self.expr.column_ids, other.expr.column_ids
1496+
)
1497+
result_columns = []
1498+
matching_join_labels = []
1499+
1500+
coalesced_ids = []
1501+
for left_id, right_id in zip(left_join_ids, right_join_ids):
1502+
coalesced_id = guid.generate_guid()
1503+
joined_expr = joined_expr.project_binary_op(
1504+
get_column_left[left_id],
1505+
get_column_right[right_id],
1506+
ops.coalesce_op,
1507+
coalesced_id,
1508+
)
1509+
coalesced_ids.append(coalesced_id)
1510+
1511+
for col_id in self.value_columns:
1512+
if col_id in left_join_ids:
1513+
key_part = left_join_ids.index(col_id)
1514+
matching_right_id = right_join_ids[key_part]
1515+
if (
1516+
self.col_id_to_label[col_id]
1517+
== other.col_id_to_label[matching_right_id]
1518+
):
1519+
matching_join_labels.append(self.col_id_to_label[col_id])
1520+
result_columns.append(coalesced_ids[key_part])
1521+
else:
1522+
result_columns.append(get_column_left[col_id])
1523+
else:
1524+
result_columns.append(get_column_left[col_id])
1525+
for col_id in other.value_columns:
1526+
if col_id in right_join_ids:
1527+
key_part = right_join_ids.index(col_id)
1528+
if other.col_id_to_label[matching_right_id] in matching_join_labels:
1529+
pass
1530+
else:
1531+
result_columns.append(get_column_right[col_id])
1532+
else:
1533+
result_columns.append(get_column_right[col_id])
14911534

1492-
# which join key parts should be coalesced
1493-
merge_join_key_mask = [
1494-
str(self.col_id_to_label[left_id]) == str(other.col_id_to_label[right_id])
1495-
for left_id, right_id in zip(left_col_ids, right_col_ids)
1496-
]
1497-
labels_to_coalesce = [
1498-
self.col_id_to_label[col_id]
1499-
for i, col_id in enumerate(left_col_ids)
1500-
if merge_join_key_mask[i]
1501-
]
1502-
1503-
def left_col_mapping(col_id: str) -> str:
1504-
if col_id in left_col_ids:
1505-
join_key_part = left_col_ids.index(col_id)
1506-
if merge_join_key_mask[join_key_part]:
1507-
return coalesced_join_cols[join_key_part]
1508-
return get_column_left(col_id)
1509-
1510-
def right_col_mapping(col_id: str) -> typing.Optional[str]:
1511-
if col_id in right_col_ids:
1512-
join_key_part = right_col_ids.index(col_id)
1513-
if merge_join_key_mask[join_key_part]:
1514-
return None
1515-
return get_column_right(col_id)
1516-
1517-
left_columns = [left_col_mapping(col_id) for col_id in self.value_columns]
1518-
1519-
right_columns = [
1520-
typing.cast(str, right_col_mapping(col_id))
1521-
for col_id in other.value_columns
1522-
if right_col_mapping(col_id)
1523-
]
1535+
if sort:
1536+
# sort uses coalesced join keys always
1537+
joined_expr = joined_expr.order_by(
1538+
[ordering.OrderingColumnReference(col_id) for col_id in coalesced_ids],
1539+
stable=True,
1540+
)
15241541

1525-
expr = joined_expr.select_columns([*left_columns, *right_columns])
1542+
joined_expr = joined_expr.select_columns(result_columns)
15261543
labels = utils.merge_column_labels(
15271544
self.column_labels,
15281545
other.column_labels,
1529-
coalesce_labels=labels_to_coalesce,
1546+
coalesce_labels=matching_join_labels,
15301547
suffixes=suffixes,
15311548
)
1532-
15331549
# Constructs default index
1534-
expr, offset_index_id = expr.promote_offsets()
1550+
offset_index_id = guid.generate_guid()
1551+
expr = joined_expr.promote_offsets(offset_index_id)
15351552
return Block(expr, index_columns=[offset_index_id], column_labels=labels)
15361553

15371554
def _force_reproject(self) -> Block:

0 commit comments

Comments
 (0)