Changes from 1 commit
b3771b8
e865395
f227476
68fc1e1
a17e027
1764106
5ff4661
936e73d
41f6083
ffbc518
a9b16c4
83fc8fb
ec1d973
c307625
b917c71
848d0a4
79d05b5
06c9866
2ed1520
1ff4f68
81e5a02
e91dbb5
108e449
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
- Loading branch information
Uh oh!
There was an error while loading. Please reload this page.
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from typing import Sequence | ||
import bigframes.core.expression as ex | ||
import bigframes.operations as ops | ||
COMPARISON_OP_TYPES = tuple( | ||
type(i) | ||
for i in ( | ||
ops.eq_op, | ||
ops.eq_null_match_op, | ||
ops.ne_op, | ||
ops.gt_op, | ||
ops.ge_op, | ||
ops.lt_op, | ||
ops.le_op, | ||
) | ||
) | ||
def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: | ||
"""Try to determine cluster col candidates that work with given predicates.""" | ||
if isinstance(predicate, ex.UnboundVariableExpression): | ||
return [predicate.id] | ||
if isinstance(predicate, ex.OpExpression): | ||
op = predicate.op | ||
if isinstance(op, COMPARISON_OP_TYPES): | ||
return cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) | ||
if isinstance(op, (type(ops.invert_op))): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a TODO for geo, too. Looks like functions like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added todo | ||
return cluster_cols_for_predicate(predicate.inputs[0]) | ||
if isinstance(op, (type(ops.and_op), type(ops.or_op))): | ||
left_cols = cluster_cols_for_predicate(predicate.inputs[0]) | ||
right_cols = cluster_cols_for_predicate(predicate.inputs[1]) | ||
return [*left_cols, *[col for col in right_cols if col not in left_cols]] | ||
else: | ||
return [] | ||
else: | ||
# Constant | ||
return [] | ||
def cluster_cols_for_comparison( | ||
left_ex: ex.Expression, right_ex: ex.Expression | ||
) -> Sequence[str]: | ||
if left_ex.is_const: | ||
if isinstance(right_ex, ex.UnboundVariableExpression): | ||
return [right_ex.id] | ||
elif right_ex.is_const: | ||
if isinstance(left_ex, ex.UnboundVariableExpression): | ||
return [left_ex.id] | ||
return [] |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -16,7 +16,6 @@ | ||||||
from __future__ import annotations | ||||||
import collections.abc | ||||||
import copy | ||||||
import datetime | ||||||
import logging | ||||||
@@ -85,6 +84,7 @@ | ||||||
import bigframes.core.nodes as nodes | ||||||
from bigframes.core.ordering import IntegerEncoding | ||||||
import bigframes.core.ordering as order | ||||||
import bigframes.core.pruning | ||||||
import bigframes.core.tree_properties as traversals | ||||||
import bigframes.core.tree_properties as tree_properties | ||||||
import bigframes.core.utils as utils | ||||||
@@ -100,6 +100,7 @@ | ||||||
import bigframes.session._io.bigquery as bf_io_bigquery | ||||||
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table | ||||||
import bigframes.session.clients | ||||||
import bigframes.session.planner | ||||||
import bigframes.version | ||||||
# Avoid circular imports. | ||||||
@@ -326,13 +327,15 @@ def session_id(self): | ||||||
@property | ||||||
def objects( | ||||||
self, | ||||||
) -> collections.abc.Set[ | ||||||
) -> Tuple[ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically a breaking change. Maybe OK since we didn't actually document this property, but might be better to change from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, yeah, actually, should we just make this private? Added this very recently and wasn't really intending this for user consumption. | ||||||
Union[ | ||||||
bigframes.core.indexes.Index, bigframes.series.Series, dataframe.DataFrame | ||||||
] | ||||||
]: | ||||||
still_alive = [i for i in self._objects if i() is not None] | ||||||
self._objects = still_alive | ||||||
# Create a set with strong references, be careful not to hold onto this needlessly, as will prevent garbage collection. | ||||||
return set(i() for i in self._objects if i() is not None) # type: ignore | ||||||
return tuple(i() for i in self._objects if i() is not None) # type: ignore | ||||||
@property | ||||||
def _project(self): | ||||||
@@ -1913,6 +1916,18 @@ def _cache_with_offsets(self, array_value: core.ArrayValue): | ||||||
).node | ||||||
self._cached_executions[array_value.node] = cached_replacement | ||||||
def _session_aware_caching(self, array_value: core.ArrayValue) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's verbify this. Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done | ||||||
# this is the occurence count across the whole session | ||||||
forest = [obj._block.expr.node for obj in self.objects] | ||||||
# These node types are cheap to re-compute | ||||||
target, cluster_col = bigframes.session.planner.session_aware_cache_plan( | ||||||
array_value.node, forest | ||||||
) | ||||||
if cluster_col: | ||||||
self._cache_with_cluster_cols(core.ArrayValue(target), [cluster_col]) | ||||||
else: | ||||||
self._cache_with_offsets(core.ArrayValue(target)) | ||||||
def _simplify_with_caching(self, array_value: core.ArrayValue): | ||||||
"""Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" | ||||||
# Apply existing caching first | ||||||
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,62 @@ | ||||||
# Copyright 2024 Google LLC | ||||||
# | ||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
# you may not use this file except in compliance with the License. | ||||||
# You may obtain a copy of the License at | ||||||
# | ||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||
# | ||||||
# Unless required by applicable law or agreed to in writing, software | ||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
# See the License for the specific language governing permissions and | ||||||
# limitations under the License. | ||||||
from __future__ import annotations | ||||||
from typing import Optional, Sequence, Tuple | ||||||
import bigframes.core.expression as ex | ||||||
import bigframes.core.nodes as nodes | ||||||
import bigframes.core.pruning as predicate_pruning | ||||||
import bigframes.core.tree_properties as traversals | ||||||
def session_aware_cache_plan( | ||||||
root: nodes.BigFrameNode, session_forest: Sequence[nodes.BigFrameNode] | ||||||
) -> Tuple[nodes.BigFrameNode, Optional[str]]: | ||||||
""" | ||||||
Determines the best node to cache given a target and a list of object roots for objects in a session. | ||||||
Returns the node to cache, and optionally a clustering column. | ||||||
tswast marked this conversation as resolved. Show resolved Hide resolvedUh oh!There was an error while loading. Please reload this page. | ||||||
""" | ||||||
node_counts = traversals.count_nodes(session_forest) | ||||||
# These node types are cheap to re-compute | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's complete the thought in this comment for clarity. Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done | ||||||
de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode) | ||||||
caching_target = cur_node = root | ||||||
caching_target_refs = node_counts.get(caching_target, 0) | ||||||
filters: list[ | ||||||
ex.Expression | ||||||
] = [] # accumulate filters into this as traverse downwards | ||||||
cluster_col: Optional[str] = None | ||||||
while isinstance(cur_node, de_cachable_types): | ||||||
if isinstance(cur_node, nodes.FilterNode): | ||||||
filters.append(cur_node.predicate) | ||||||
elif isinstance(cur_node, nodes.ProjectionNode): | ||||||
bindings = {name: expr for expr, name in cur_node.assignments} | ||||||
filters = [i.bind_all_variables(bindings) for i in filters] | ||||||
tswast marked this conversation as resolved. Show resolved Hide resolvedUh oh!There was an error while loading. Please reload this page. | ||||||
cur_node = cur_node.child | ||||||
cur_node_refs = node_counts.get(cur_node, 0) | ||||||
if cur_node_refs > caching_target_refs: | ||||||
caching_target, caching_target_refs = cur_node, cur_node_refs | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to do anything to make sure we aren't selecting more columns than needed? I have some worries that column selection wouldn't have the desired affect. Though, I suppose that'll only matter with unordered + unindexed DataFrames due to our hashing of the row. Maybe worth a TODO to be resolved with that project? That said, I'd be curious to see if unordered/unindexed would benefit from caching at all due to the difficulties of using the cache in row identity joins. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Row hashing shouldn't matter, as that only happens for initial table scan, which shouldn't need to be cached. However, yes, we could try to prune columns unused by the session before caching. Would need to be careful not to invalidate existing caching or join->projection rewriter, but should be possible. This could be done in a few ways, such as a partial cache (containing only some columns), or by rewriting all the session BFETs with a column pruning pass before caching. | ||||||
cluster_col = None | ||||||
# Just pick the first cluster-compatible predicate | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO to sort by a selectivity heuristic? Seems like this layer might make more sense than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added todo | ||||||
for predicate in filters: | ||||||
# Cluster cols only consider the target object and not other sesssion objects | ||||||
cluster_cols = predicate_pruning.cluster_cols_for_predicate(predicate) | ||||||
if len(cluster_cols) > 0: | ||||||
cluster_col = cluster_cols[0] | ||||||
continue | ||||||
return caching_target, cluster_col |
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.