Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BSE-4358: BodoSQL support for S3 tables #126

Merged
merged 81 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e76ddfb
Start planner API movement
njriasan Jan 2, 2025
64812e8
Updated the entry point for parseQuery
njriasan Jan 2, 2025
b1bbc8a
Fixed the import
njriasan Jan 2, 2025
88e6673
Merge branch 'main' into nick/bodosql_python_interface
njriasan Jan 2, 2025
325486c
Merge branch 'main' into nick/bodosql_python_interface
njriasan Jan 2, 2025
85cbbad
Moved getOptimizedPlanString
njriasan Jan 3, 2025
777458d
Moved getPandasAndPlanString
njriasan Jan 3, 2025
293e3b8
Moved getPandasString
njriasan Jan 3, 2025
2a835ec
Moved getWriteType
njriasan Jan 3, 2025
f9d9611
Moved executeDDL
njriasan Jan 3, 2025
abab715
Moved the last RelationalAlgebra API
njriasan Jan 3, 2025
1ec8501
Added missing docstrings [run CI]
njriasan Jan 3, 2025
7f9d2e9
Fixed the import ordering [run CI]
njriasan Jan 3, 2025
7da882e
Move reset to package private [run CI]
njriasan Jan 3, 2025
6d02755
Updated fromTypeID API [run CI]
njriasan Jan 3, 2025
2a943b9
Updated WriteTargetEnum [run CI]
njriasan Jan 3, 2025
add7295
Updated the logger API [run CI]
njriasan Jan 3, 2025
d8665d5
Fixed save issue [run CI]
njriasan Jan 3, 2025
28cce7d
Fixed API copying issue [run CI]
njriasan Jan 3, 2025
7ac23dc
Merge branch 'nick/bodosql_python_interface' into nick/fix_BodoSQLCol…
njriasan Jan 3, 2025
af1a9cf
Added missing static declaration [run CI]
njriasan Jan 3, 2025
864ee4e
Merge branch 'nick/bodosql_python_interface' into nick/fix_BodoSQLCol…
njriasan Jan 3, 2025
48605d6
Added more missing static declarations [run CI]
njriasan Jan 3, 2025
1b909c9
Merge branch 'main' into nick/bodosql_python_interface [run CI]
njriasan Jan 3, 2025
04db869
Removed ArrayList
njriasan Jan 3, 2025
80fd441
Removed hash map interaction
njriasan Jan 3, 2025
98bafde
Added stack trace API
njriasan Jan 3, 2025
052ce93
Removed properties
njriasan Jan 3, 2025
c25928c
Added builders
njriasan Jan 3, 2025
2349f14
Add local table builder
njriasan Jan 3, 2025
142c45f
Update LocalSchema
njriasan Jan 3, 2025
7febfe5
Removed LocalSchemaClass [run CI]
njriasan Jan 3, 2025
f87b383
Removed more dead code [run CI]
njriasan Jan 3, 2025
bca4fdc
Fixed bugs [run CI]
njriasan Jan 3, 2025
3b36b27
Merge branch 'nick/bodosql_python_interface' into nick/fix_BodoSQLCol…
njriasan Jan 3, 2025
688f131
Merge branch 'nick/fix_BodoSQLColumn' into nick/remove_constructors_b…
njriasan Jan 3, 2025
6918eb9
Removed planner type
njriasan Jan 3, 2025
09f51b6
Removed DDLExecutionResult calls
njriasan Jan 3, 2025
7ebec08
Added get lowered globals
njriasan Jan 3, 2025
f484159
Refactored pair access
njriasan Jan 3, 2025
1c5d5be
Fixed the exception interface
njriasan Jan 3, 2025
b1cee44
Removed ColumnDataTypeClass
njriasan Jan 3, 2025
76a0c56
Removed ColumnDataTypeClass
njriasan Jan 3, 2025
1e0139e
Removed everything but the constructors
njriasan Jan 3, 2025
f63733d
Refactored RelationalAlgebraGenerator constructor
njriasan Jan 3, 2025
374e4bf
Removed RelationalAlgebraGeneratorClass [run CI]
njriasan Jan 3, 2025
db61339
Merge branch 'main' into nick/bodosql_python_interface [run CI]
njriasan Jan 4, 2025
4b7890e
Merge branch 'nick/bodosql_python_interface' into nick/fix_BodoSQLCol…
njriasan Jan 4, 2025
a9c46f3
Merge branch 'nick/fix_BodoSQLColumn' into nick/remove_constructors_b…
njriasan Jan 4, 2025
10952b6
Merge branch 'nick/remove_constructors_bodosql' into nick/remove_rema…
njriasan Jan 4, 2025
395d644
Fixed map APIs [run CI]
njriasan Jan 5, 2025
95052cb
Merge branch 'nick/remove_constructors_bodosql' into nick/remove_rema…
njriasan Jan 5, 2025
2ca5b36
Fixed create_java_dynamic_parameter_type_list [run CI]
njriasan Jan 6, 2025
db9edfa
Merge branch 'nick/remove_constructors_bodosql' into nick/remove_rema…
njriasan Jan 6, 2025
d066870
Fixed merge conflict [run CI]
njriasan Jan 6, 2025
c873e60
Merge branch 'nick/fix_BodoSQLColumn' into nick/remove_constructors_b…
njriasan Jan 6, 2025
2dfb822
Apply Isaac's feedback [run CI]
njriasan Jan 6, 2025
c66bafc
Merge branch 'nick/remove_constructors_bodosql' into nick/remove_rema…
njriasan Jan 6, 2025
fcebae7
Merged with main [run CI]
njriasan Jan 6, 2025
0fb2a07
Merge branch 'nick/remove_constructors_bodosql' into nick/remove_rema…
njriasan Jan 6, 2025
2fddacb
Fixed APIs
njriasan Jan 6, 2025
3164ada
moved definitions [run CI]
njriasan Jan 6, 2025
86947e4
Add S3TablesCatalog skeleton
IsaacWarren Jan 6, 2025
e91d009
Add implementations for bodosql catalog
IsaacWarren Jan 6, 2025
0f7b77c
Add BodoS3TablesCatalog to PythonEntryPoint
IsaacWarren Jan 7, 2025
550691b
Add s3_tables_catalog to bodosql
IsaacWarren Jan 7, 2025
0585d51
Add s3_tables_catalog fixture
IsaacWarren Jan 7, 2025
01fe6ac
Actually cleanup written table
IsaacWarren Jan 7, 2025
d712c80
Add basic s3 tables read test
IsaacWarren Jan 7, 2025
a7cd361
Add write test
IsaacWarren Jan 7, 2025
43844d7
Update docs
IsaacWarren Jan 7, 2025
e9372c4
Allow catalogs to not specify a default schema
IsaacWarren Jan 7, 2025
d5bb594
Don't give default schema
IsaacWarren Jan 7, 2025
c015ad2
[Run CI]
IsaacWarren Jan 7, 2025
43f60fc
Merge remote-tracking branch 'origin/main' into isaac/bodosql_s3_tables
IsaacWarren Jan 8, 2025
5a8c201
[Run CI]
IsaacWarren Jan 8, 2025
6e2cc69
Fix docstring
IsaacWarren Jan 9, 2025
21de7a1
Fix doc
IsaacWarren Jan 9, 2025
d4389f5
Fix comment
IsaacWarren Jan 9, 2025
06c9ed3
Add infra requirements to docstrings
IsaacWarren Jan 9, 2025
7095e2d
[Run CI]
IsaacWarren Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions BodoSQL/bodosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
GlueCatalogType,
get_glue_connection,
)
from bodosql.bodosql_types.s3_tables_catalog import (
S3TablesCatalog,
S3TablesCatalogType,
get_s3_tables_connection,
)

import bodosql.context_ext
import bodosql.ddl_ext
Expand Down
182 changes: 182 additions & 0 deletions BodoSQL/bodosql/bodosql_types/s3_tables_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Python and JIT class for describing a S3 Tables Iceberg catalog. A S3 Tables
catalog contains all information needed to connect to S3 Tables for organizing and modifying tables.
"""

import numba
from numba.core import cgutils, types
from numba.extending import (
NativeValue,
box,
intrinsic,
make_attribute_wrapper,
models,
overload,
register_model,
typeof_impl,
unbox,
)

from bodo.io.iceberg import IcebergConnectionType
from bodo.utils.typing import get_literal_value, raise_bodo_error
from bodosql import DatabaseCatalog, DatabaseCatalogType
from bodosql.imported_java_classes import JavaEntryPoint


def _create_java_s3_tables_catalog(warehouse: str):
"""
Create a Java BodoS3Tables object.
Args:
warehouse (str): The warehouse to connect to.
Returns:
JavaObject: A Java BodoS3Tables object.
"""
return JavaEntryPoint.buildBodoS3TablesCatalog(warehouse)


class S3TablesCatalog(DatabaseCatalog):
"""
Python class for storing the information
needed to connect to a S3 Tables Iceberg catalog.
"""

def __init__(self, warehouse: str):
"""
Create a S3 Tables catalog from a connection string to a S3 Tables catalog.
Args:
warehouse (str): The warehouse to connect to.
"""
self.warehouse = warehouse

def get_java_object(self):
return _create_java_s3_tables_catalog(self.warehouse)

def __eq__(self, other):
if not isinstance(other, S3TablesCatalog):
return False
return self.warehouse == other.warehouse


@overload(S3TablesCatalog, no_unliteral=True)
def overload_s3_tables_catalog_constructor(warehouse: str):
raise_bodo_error("S3TablesCatalog: Cannot be created in JIT mode.")


class S3TablesCatalogType(DatabaseCatalogType):
def __init__(self, warehouse: str):
self.warehouse = warehouse
super().__init__(name=f"S3TablesCatalog({self.warehouse=})")

def get_java_object(self):
return _create_java_s3_tables_catalog(self.warehouse)

@property
def key(self):
return self.warehouse


@typeof_impl.register(S3TablesCatalog)
def typeof_s3_tables_catalog(val, c):
return S3TablesCatalogType(warehouse=val.warehouse)


register_model(S3TablesCatalogType)(models.OpaqueModel)


@box(S3TablesCatalogType)
def box_s3_tables_catalog_type(typ, val, c):
"""
Box a S3 Tables Catalog native representation into a Python object. We populate
the contents based on typing information.
"""
warehouse_obj = c.pyapi.from_native_value(
types.unicode_type,
c.context.get_constant_generic(c.builder, types.unicode_type, typ.warehouse),
c.env_manager,
)
s3_tables_catalog_obj = c.pyapi.unserialize(
c.pyapi.serialize_object(S3TablesCatalog)
)
res = c.pyapi.call_function_objargs(s3_tables_catalog_obj, (warehouse_obj,))
c.pyapi.decref(warehouse_obj)
c.pyapi.decref(s3_tables_catalog_obj)
return res


@unbox(S3TablesCatalogType)
def unbox_s3_tables_catalog_type(typ, val, c):
"""
Unbox a S3 Tables Catalog Python object into its native representation.
Since the actual model is opaque we can just generate a dummy.
"""
return NativeValue(c.context.get_dummy_value())


@numba.jit(types.unicode_type(types.unicode_type))
def get_conn_str(warehouse):
"""Get the connection string for a S3 Tables Iceberg catalog."""
return warehouse


class S3TablesConnectionType(IcebergConnectionType):
"""
Python class for storing the information needed to connect to a S3 Tables Iceberg catalog.
The compiler can get a connection string using the get_conn_str function.
The runtime can get a connection string using the conn_str attribute.
"""

def __init__(self, warehouse):
self.warehouse = warehouse
self.conn_str = get_conn_str(warehouse)

super().__init__(name=f"S3TablesConnectionType({warehouse=})")

def get_conn_str(self) -> str:
return "iceberg+" + self.conn_str


@intrinsic
def _get_s3_tables_connection(typingctx, warehouse, conn_str):
"""Create a struct model for a S3TablesonnectionType from a warehouse and connection string."""
literal_warehouse = get_literal_value(warehouse)
s3_tables_connection_type = S3TablesConnectionType(literal_warehouse)

def codegen(context, builder, sig, args):
"""lowering code to initialize a S3TablesConnectionType"""
s3_tables_connection_type = sig.return_type
s3_tables_connection_struct = cgutils.create_struct_proxy(
s3_tables_connection_type
)(context, builder)
context.nrt.incref(builder, sig.args[1], args[1])
s3_tables_connection_struct.conn_str = args[1]
return s3_tables_connection_struct._getvalue()

return s3_tables_connection_type(warehouse, conn_str), codegen


def get_s3_tables_connection(warehouse: str):
pass


@overload(get_s3_tables_connection, no_unliteral=True)
def overload_get_s3_tables_connection(warehouse: str):
"""Overload for get_s3_tables_connection that creates a S3TablesConnectionType."""

def impl(warehouse: str): # pragma: no cover
conn_str = get_conn_str(warehouse)
return _get_s3_tables_connection(warehouse, conn_str)

return impl


@register_model(S3TablesConnectionType)
class S3TablesConnectionModel(models.StructModel):
"""Model for S3TablesConnectionType that has one member, conn_str."""

def __init__(self, dmm, fe_type):
members = [
("conn_str", types.unicode_type),
]
super().__init__(dmm, fe_type, members)


make_attribute_wrapper(S3TablesConnectionType, "conn_str", "conn_str")
4 changes: 3 additions & 1 deletion BodoSQL/bodosql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,9 @@ def _create_generator(self, hide_credentials: bool):
self.default_tz,
)

def add_or_replace_view(self, name: str, table: pd.DataFrame | TablePath):
def add_or_replace_view(
self, name: str, table: pd.DataFrame | TablePath
) -> BodoSQLContext:
"""Create a new BodoSQLContext that contains all of the old DataFrames and the
new table being provided. If there is a DataFrame in the old BodoSQLContext with
the same name, it is replaced by the new table in the new BodoSQLContext. Otherwise
Expand Down
13 changes: 12 additions & 1 deletion BodoSQL/bodosql/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
tabular_connection,
)
from bodo.tests.iceberg_database_helpers.utils import get_spark
from bodo.tests.utils import gen_nonascii_list
from bodo.tests.utils import gen_nonascii_list, temp_env_override

# Patch to avoid PySpark's Py4j exception handler in testing.
# See:
Expand Down Expand Up @@ -1973,6 +1973,17 @@ def glue_catalog():
return bodosql.GlueCatalog(warehouse=warehouse)


@pytest.fixture
def s3_tables_catalog():
"""
Returns a s3 tables catalog object
"""

warehouse = "arn:aws:s3tables:us-east-2:427443013497:bucket/unittest-bucket"
with temp_env_override({"AWS_REGION": "us-east-2"}):
yield bodosql.S3TablesCatalog(warehouse=warehouse)


@pytest.fixture(
params=[
pytest.param(
Expand Down
113 changes: 113 additions & 0 deletions BodoSQL/bodosql/tests/test_types/test_s3_tables_iceberg_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pandas as pd

import bodo
import bodosql
from bodo.tests.utils import (
assert_tables_equal,
check_func,
gen_unique_table_id,
pytest_s3_tables,
run_rank0,
)
from bodosql.bodosql_types.s3_tables_catalog import S3TablesConnectionType

pytestmark = pytest_s3_tables


# Refer to bodo/tests/test_s3_tables_iceberg.py for infrastructure
# required to run these tests
def test_basic_read(memory_leak_check, s3_tables_catalog):
"""
Test reading an entire Iceberg table from S3 Tables in SQL
"""
bc = bodosql.BodoSQLContext(catalog=s3_tables_catalog)

def impl(bc, query):
return bc.sql(query)

py_out = pd.DataFrame(
{
"A": ["ally", "bob", "cassie", "david", None],
"B": [10.5, -124.0, 11.11, 456.2, -8e2],
"C": [True, None, False, None, None],
}
)

query = 'SELECT * FROM "read_namespace"."bodo_iceberg_read_test"'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to list a docstring for what must exist and where at the top. In case we ever do an account cleanup/deletion. Also if there are creation steps including those would be great.

check_func(
impl,
(bc, query),
py_output=py_out,
sort_output=True,
reset_index=True,
)


def test_s3_tables_catalog_iceberg_write(s3_tables_catalog, memory_leak_check):
"""tests that writing tables works"""
import bodo_iceberg_connector as bic

in_df = pd.DataFrame(
{
"ints": list(range(100)),
"floats": [float(x) for x in range(100)],
"str": [str(x) for x in range(100)],
"dict_str": ["abc", "df"] * 50,
}
)
bc = bodosql.BodoSQLContext(catalog=s3_tables_catalog)
bc = bc.add_or_replace_view("TABLE1", in_df)
con_str = S3TablesConnectionType(s3_tables_catalog.warehouse).get_conn_str()
table_name = run_rank0(
lambda: gen_unique_table_id("bodosql_catalog_write_iceberg_table").upper()
)().lower()

def impl(bc, query):
bc.sql(query)
# Return an arbitrary value. This is just to enable a py_output
# so we can reuse the check_func infrastructure.
return 5

ctas_query = f'CREATE OR REPLACE TABLE "write_namespace"."{table_name}" AS SELECT * from __bodolocal__."TABLE1"'
exception_occurred_in_test_body = False
try:
# Only test with only_1D=True so we only insert into the table once.
check_func(
impl,
(bc, ctas_query),
only_1D=True,
py_output=5,
use_table_format=True,
)

@bodo.jit
def read_results(con_str, schema, table_name):
output_df = pd.read_sql_table(table_name, con=con_str, schema=schema)
return bodo.allgatherv(output_df)

output_df = read_results(con_str, "write_namespace", table_name)
assert_tables_equal(output_df, in_df, check_dtype=False)

except Exception as e:
# In the case that another exception ocurred within the body of the try,
# We may not have created a table to drop.
# because of this, we call delete_table in a try/except, to avoid
# masking the original exception
exception_occurred_in_test_body = True
raise e
finally:
if exception_occurred_in_test_body:
try:
run_rank0(bic.delete_table)(
bodo.io.iceberg.format_iceberg_conn(con_str),
"write_namespace",
table_name,
)
except Exception:
pass
else:
run_rank0(bic.delete_table)(
bodo.io.iceberg.format_iceberg_conn(con_str),
"write_namespace",
table_name,
)
14 changes: 13 additions & 1 deletion BodoSQL/calcite_sql/bodosql-calcite-application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<!-- Start Iceberg Dependencies. We may remove these as we cleanup the relationship with the Iceberg Connector. -->
<iceberg.version>1.5.2</iceberg.version>
<hadoop.version>3.3.3</hadoop.version>
<aws.version>2.20.26</aws.version>
<aws.version>2.29.26</aws.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check how these changes impact the package size? May be relevant for the next pip publication.

Copy link
Contributor Author

@IsaacWarren IsaacWarren Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wheel is 3MB bigger since the last release, no cause for concern

<janino.version>3.1.10</janino.version>
<!-- End Iceberg Dependencies -->
</properties>
Expand Down Expand Up @@ -220,6 +220,18 @@
<artifactId>s3</artifactId>
<version>${aws.version}</version>
</dependency>
<!-- S3Tables Dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>${aws.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>0.1.3</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ public class BodoSQLRelDataTypeSystem extends RelDataTypeSystemImpl {
public boolean enableStreamingSortLimitOffset;

public static class CatalogContext {
public String currentDatabase;
public @Nullable String currentDatabase;
public String currentAccount;

public CatalogContext(String database, String account) {
public CatalogContext(@Nullable String database, String account) {
this.currentDatabase = database;
this.currentAccount = account;
}
}

/*
CURRENT_DATABSE: Name of the database in use for the current session.
CURRENT_DATABASE: Name of the database in use for the current session.

*/
private CatalogContext currentDatabase;
Expand Down
Loading
Loading