Merged
Show file tree
Hide file tree
Changes from all commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b3771b8
feat: Add Series.peek to preview data efficiently
TrevorBergeronMay 28, 2024
e865395
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronMay 30, 2024
f227476
add another test
TrevorBergeronMay 30, 2024
68fc1e1
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronMay 31, 2024
a17e027
cleanup comments
TrevorBergeronMay 31, 2024
1764106
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 4, 2024
5ff4661
more comments, up to 4 cluster cols for session-based caching
TrevorBergeronJun 4, 2024
936e73d
add another session caching test
TrevorBergeronJun 4, 2024
41f6083
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 4, 2024
ffbc518
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 5, 2024
a9b16c4
add todo for geo predicate detection
TrevorBergeronJun 5, 2024
83fc8fb
Merge branch 'main' into series_cache
tswastJun 12, 2024
ec1d973
add dtype clusterable and orderable property
TrevorBergeronJun 12, 2024
c307625
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 13, 2024
b917c71
fix session aware caching unit tests
TrevorBergeronJun 13, 2024
848d0a4
mock session for planner test
TrevorBergeronJun 13, 2024
79d05b5
fix offsets column name collision
TrevorBergeronJun 25, 2024
06c9866
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 25, 2024
2ed1520
Update bigframes/dtypes.py
TrevorBergeronJun 25, 2024
1ff4f68
Update bigframes/dtypes.py
TrevorBergeronJun 25, 2024
81e5a02
add another series peek test
TrevorBergeronJun 25, 2024
e91dbb5
remove partial comment
TrevorBergeronJun 26, 2024
108e449
Merge remote-tracking branch '/main' into series_cache
TrevorBergeronJun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import bigframes.core.expression as ex
import bigframes.core.schema as schemata
import bigframes.dtypes
import bigframes.operations as ops

LOW_CARDINALITY_TYPES = [bigframes.dtypes.BOOL_DTYPE]

COMPARISON_OP_TYPES = tuple(
type(i)
for i in (
ops.eq_op,
ops.eq_null_match_op,
ops.ne_op,
ops.gt_op,
ops.ge_op,
ops.lt_op,
ops.le_op,
)
)


def cluster_cols_for_predicate(
predicate: ex.Expression, schema: schemata.ArraySchema
) -> list[str]:
"""Try to determine cluster col candidates that work with given predicates."""
# TODO: Prioritize based on predicted selectivity (eg. equality conditions are probably very selective)
if isinstance(predicate, ex.UnboundVariableExpression):
cols = [predicate.id]
elif isinstance(predicate, ex.OpExpression):
op = predicate.op
# TODO: Support geo predicates, which support pruning if clustered (other than st_disjoint)
# https://cloud.google.com/bigquery/docs/reference/standard-sql/geography_functions
if isinstance(op, COMPARISON_OP_TYPES):
cols = cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1])
elif isinstance(op, (type(ops.invert_op))):
cols = cluster_cols_for_predicate(predicate.inputs[0], schema)
elif isinstance(op, (type(ops.and_op), type(ops.or_op))):
left_cols = cluster_cols_for_predicate(predicate.inputs[0], schema)
right_cols = cluster_cols_for_predicate(predicate.inputs[1], schema)
cols = [*left_cols, *[col for col in right_cols if col not in left_cols]]
else:
cols = []
else:
# Constant
cols = []
return [
col for col in cols if bigframes.dtypes.is_clusterable(schema.get_type(col))
]


def cluster_cols_for_comparison(
left_ex: ex.Expression, right_ex: ex.Expression
) -> list[str]:
# TODO: Try to normalize expressions such that one side is a single variable.
# eg. Convert -cola>=3 to cola<-3 and colb+3 < 4 to colb < 1
if left_ex.is_const:
# There are some invertible ops that would also be ok
if isinstance(right_ex, ex.UnboundVariableExpression):
return [right_ex.id]
elif right_ex.is_const:
if isinstance(left_ex, ex.UnboundVariableExpression):
return [left_ex.id]
return []
Original file line numberDiff line numberDiff line change
Expand Up@@ -74,52 +74,95 @@ class SimpleDtypeInfo:
logical_bytes: int = (
8 # this is approximate only, some types are variably sized, also, compression
)
orderable: bool = False
clusterable: bool = False


# TODO: Missing BQ types: INTERVAL, JSON, RANGE
# TODO: Add mappings to python types
SIMPLE_TYPES = (
SimpleDtypeInfo(
dtype=INT_DTYPE, arrow_dtype=pa.int64(), type_kind=("INT64", "INTEGER")
dtype=INT_DTYPE,
arrow_dtype=pa.int64(),
type_kind=("INT64", "INTEGER"),
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=FLOAT_DTYPE, arrow_dtype=pa.float64(), type_kind=("FLOAT64", "FLOAT")
dtype=FLOAT_DTYPE,
arrow_dtype=pa.float64(),
type_kind=("FLOAT64", "FLOAT"),
orderable=True,
),
SimpleDtypeInfo(
dtype=BOOL_DTYPE,
arrow_dtype=pa.bool_(),
type_kind=("BOOL", "BOOLEAN"),
logical_bytes=1,
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(dtype=STRING_DTYPE, arrow_dtype=pa.string(), type_kind=("STRING",)),
SimpleDtypeInfo(
dtype=DATE_DTYPE, arrow_dtype=pa.date32(), type_kind=("DATE",), logical_bytes=4
dtype=STRING_DTYPE,
arrow_dtype=pa.string(),
type_kind=("STRING",),
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(dtype=TIME_DTYPE, arrow_dtype=pa.time64("us"), type_kind=("TIME",)),
SimpleDtypeInfo(
dtype=DATETIME_DTYPE, arrow_dtype=pa.timestamp("us"), type_kind=("DATETIME",)
dtype=DATE_DTYPE,
arrow_dtype=pa.date32(),
type_kind=("DATE",),
logical_bytes=4,
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=TIME_DTYPE,
arrow_dtype=pa.time64("us"),
type_kind=("TIME",),
orderable=True,
),
SimpleDtypeInfo(
dtype=DATETIME_DTYPE,
arrow_dtype=pa.timestamp("us"),
type_kind=("DATETIME",),
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=TIMESTAMP_DTYPE,
arrow_dtype=pa.timestamp("us", tz="UTC"),
type_kind=("TIMESTAMP",),
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",), orderable=True
),
SimpleDtypeInfo(dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",)),
SimpleDtypeInfo(
dtype=NUMERIC_DTYPE,
arrow_dtype=pa.decimal128(38, 9),
type_kind=("NUMERIC",),
logical_bytes=16,
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=BIGNUMERIC_DTYPE,
arrow_dtype=pa.decimal256(76, 38),
type_kind=("BIGNUMERIC",),
logical_bytes=32,
orderable=True,
clusterable=True,
),
# Geo has no corresponding arrow dtype
SimpleDtypeInfo(
dtype=GEO_DTYPE, arrow_dtype=None, type_kind=("GEOGRAPHY",), logical_bytes=40
dtype=GEO_DTYPE,
arrow_dtype=None,
type_kind=("GEOGRAPHY",),
logical_bytes=40,
clusterable=True,
),
)

Expand DownExpand Up@@ -209,9 +252,25 @@ def is_comparable(type: ExpressionType) -> bool:
return (type is not None) and is_orderable(type)


_ORDERABLE_SIMPLE_TYPES = set(
mapping.dtype for mapping in SIMPLE_TYPES if mapping.orderable
)


def is_orderable(type: ExpressionType) -> bool:
# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
return not is_array_like(type) and not is_struct_like(type) and (type != GEO_DTYPE)
return type in _ORDERABLE_SIMPLE_TYPES


_CLUSTERABLE_SIMPLE_TYPES = set(
mapping.dtype for mapping in SIMPLE_TYPES if mapping.clusterable
)


def is_clusterable(type: ExpressionType) -> bool:
# https://cloud.google.com/bigquery/docs/clustered-tables#cluster_column_types
# This is based on default database type mapping, could in theory represent in non-default bq type to cluster.
return type in _CLUSTERABLE_SIMPLE_TYPES


def is_bool_coercable(type: ExpressionType) -> bool:
Expand Down
Loading