-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BSE-4358: BodoSQL support for S3 tables #126
Changes from 76 commits
e76ddfb
64812e8
b1bbc8a
88e6673
325486c
85cbbad
777458d
293e3b8
2a835ec
f9d9611
abab715
1ec8501
7f9d2e9
7da882e
6d02755
2a943b9
add7295
d8665d5
28cce7d
7ac23dc
af1a9cf
864ee4e
48605d6
1b909c9
04db869
80fd441
98bafde
052ce93
c25928c
2349f14
142c45f
7febfe5
f87b383
bca4fdc
3b36b27
688f131
6918eb9
09f51b6
7ebec08
f484159
1c5d5be
b1cee44
76a0c56
1e0139e
f63733d
374e4bf
db61339
4b7890e
a9c46f3
10952b6
395d644
95052cb
2ca5b36
db9edfa
d066870
c873e60
2dfb822
c66bafc
fcebae7
0fb2a07
2fddacb
3164ada
86947e4
e91d009
0f7b77c
550691b
0585d51
01fe6ac
d712c80
a7cd361
43844d7
e9372c4
d5bb594
c015ad2
43f60fc
5a8c201
6e2cc69
21de7a1
d4389f5
06c9ed3
7095e2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
"""Python and JIT class for describing a S3 Tables Iceberg catalog. A S3 Tables | ||
catalog contains all information needed to connect to S3 Tables for organizing and modifying tables. | ||
""" | ||
|
||
import numba | ||
from numba.core import cgutils, types | ||
from numba.extending import ( | ||
NativeValue, | ||
box, | ||
intrinsic, | ||
make_attribute_wrapper, | ||
models, | ||
overload, | ||
register_model, | ||
typeof_impl, | ||
unbox, | ||
) | ||
|
||
from bodo.io.iceberg import IcebergConnectionType | ||
from bodo.utils.typing import get_literal_value, raise_bodo_error | ||
from bodosql import DatabaseCatalog, DatabaseCatalogType | ||
from bodosql.imported_java_classes import JavaEntryPoint | ||
|
||
|
||
def _create_java_s3_tables_catalog(warehouse: str): | ||
""" | ||
Create a Java BodoS3Tables object. | ||
Args: | ||
warehouse (str): The warehouse to connect to. | ||
Returns: | ||
JavaObject: A Java BodoS3Tables object. | ||
""" | ||
return JavaEntryPoint.buildBodoS3TablesCatalog(warehouse) | ||
|
||
|
||
class S3TablesCatalog(DatabaseCatalog): | ||
""" | ||
Python class for storing the information | ||
needed to connect to a S3 Tables Iceberg catalog. | ||
""" | ||
|
||
def __init__(self, warehouse: str): | ||
""" | ||
Create a S3 Tables catalog from a connection string to a S3 Tables catalog. | ||
Args: | ||
warehouse (str): The warehouse to connect to. | ||
""" | ||
self.warehouse = warehouse | ||
|
||
def get_java_object(self): | ||
return _create_java_s3_tables_catalog(self.warehouse) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, S3TablesCatalog): | ||
return False | ||
return self.warehouse == other.warehouse | ||
|
||
|
||
@overload(S3TablesCatalog, no_unliteral=True) | ||
def overload_s3_tables_catalog_constructor(warehouse: str): | ||
raise_bodo_error("S3TablesCatalog: Cannot be created in JIT mode.") | ||
|
||
|
||
class S3TablesCatalogType(DatabaseCatalogType): | ||
def __init__(self, warehouse: str): | ||
self.warehouse = warehouse | ||
super().__init__(name=f"S3TablesCatalog({self.warehouse=})") | ||
|
||
def get_java_object(self): | ||
return _create_java_s3_tables_catalog(self.warehouse) | ||
|
||
@property | ||
def key(self): | ||
return self.warehouse | ||
|
||
|
||
@typeof_impl.register(S3TablesCatalog) | ||
def typeof_s3_tables_catalog(val, c): | ||
return S3TablesCatalogType(warehouse=val.warehouse) | ||
|
||
|
||
register_model(S3TablesCatalogType)(models.OpaqueModel) | ||
|
||
|
||
@box(S3TablesCatalogType) | ||
def box_s3_tables_catalog_type(typ, val, c): | ||
""" | ||
Box a S3 Tables Catalog native representation into a Python object. We populate | ||
the contents based on typing information. | ||
""" | ||
warehouse_obj = c.pyapi.from_native_value( | ||
types.unicode_type, | ||
c.context.get_constant_generic(c.builder, types.unicode_type, typ.warehouse), | ||
c.env_manager, | ||
) | ||
s3_tables_catalog_obj = c.pyapi.unserialize( | ||
c.pyapi.serialize_object(S3TablesCatalog) | ||
) | ||
res = c.pyapi.call_function_objargs(s3_tables_catalog_obj, (warehouse_obj,)) | ||
c.pyapi.decref(warehouse_obj) | ||
c.pyapi.decref(s3_tables_catalog_obj) | ||
return res | ||
|
||
|
||
@unbox(S3TablesCatalogType) | ||
def unbox_s3_tables_catalog_type(typ, val, c): | ||
""" | ||
Unbox a S3 Tables Catalog Python object into its native representation. | ||
Since the actual model is opaque we can just generate a dummy. | ||
""" | ||
return NativeValue(c.context.get_dummy_value()) | ||
|
||
|
||
@numba.jit(types.unicode_type(types.unicode_type)) | ||
def get_conn_str(warehouse): | ||
"""Get the connection string for a S3 Tables Iceberg catalog.""" | ||
return warehouse | ||
|
||
|
||
class S3TablesConnectionType(IcebergConnectionType): | ||
""" | ||
Python class for storing the information needed to connect to a S3 Tables Iceberg catalog. | ||
The compiler can get a connection string using the get_conn_str function. | ||
The runtime can get a connection string using the conn_str attribute. | ||
""" | ||
|
||
def __init__(self, warehouse): | ||
self.warehouse = warehouse | ||
self.conn_str = get_conn_str(warehouse) | ||
|
||
super().__init__(name=f"S3TablesConnectionType({warehouse=})") | ||
|
||
def get_conn_str(self) -> str: | ||
return "iceberg+" + self.conn_str | ||
|
||
|
||
@intrinsic | ||
def _get_s3_tables_connection(typingctx, warehouse, conn_str): | ||
"""Create a struct model for a S3TablesonnectionType from a warehouse and connection string.""" | ||
literal_warehouse = get_literal_value(warehouse) | ||
s3_tables_connection_type = S3TablesConnectionType(literal_warehouse) | ||
|
||
def codegen(context, builder, sig, args): | ||
"""lowering code to initialize a S3TablesConnectionType""" | ||
s3_tables_connection_type = sig.return_type | ||
s3_tables_connection_struct = cgutils.create_struct_proxy( | ||
s3_tables_connection_type | ||
)(context, builder) | ||
context.nrt.incref(builder, sig.args[1], args[1]) | ||
s3_tables_connection_struct.conn_str = args[1] | ||
return s3_tables_connection_struct._getvalue() | ||
|
||
return s3_tables_connection_type(warehouse, conn_str), codegen | ||
|
||
|
||
def get_s3_tables_connection(warehouse: str): | ||
pass | ||
|
||
|
||
@overload(get_s3_tables_connection, no_unliteral=True) | ||
def overload_get_s3_tables_connection(warehouse: str): | ||
"""Overload for get_s3_tables_connection that creates a S3TablesConnectionType.""" | ||
|
||
def impl(warehouse: str): # pragma: no cover | ||
conn_str = get_conn_str(warehouse) | ||
return _get_s3_tables_connection(warehouse, conn_str) | ||
|
||
return impl | ||
|
||
|
||
@register_model(S3TablesConnectionType) | ||
class S3TablesConnectionModel(models.StructModel): | ||
"""Model for S3TablesConnectionType that has one member, conn_str.""" | ||
|
||
def __init__(self, dmm, fe_type): | ||
members = [ | ||
("conn_str", types.unicode_type), | ||
] | ||
super().__init__(dmm, fe_type, members) | ||
|
||
|
||
make_attribute_wrapper(S3TablesConnectionType, "conn_str", "conn_str") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import pandas as pd | ||
|
||
import bodo | ||
import bodosql | ||
from bodo.tests.utils import ( | ||
assert_tables_equal, | ||
check_func, | ||
gen_unique_table_id, | ||
pytest_s3_tables, | ||
run_rank0, | ||
) | ||
from bodosql.bodosql_types.s3_tables_catalog import S3TablesConnectionType | ||
|
||
pytestmark = pytest_s3_tables | ||
|
||
|
||
def test_basic_read(memory_leak_check, s3_tables_catalog): | ||
""" | ||
Test reading an entire Iceberg table from S3 Tables in SQL | ||
""" | ||
bc = bodosql.BodoSQLContext(catalog=s3_tables_catalog) | ||
|
||
def impl(bc, query): | ||
return bc.sql(query) | ||
|
||
py_out = pd.DataFrame( | ||
{ | ||
"A": ["ally", "bob", "cassie", "david", None], | ||
"B": [10.5, -124.0, 11.11, 456.2, -8e2], | ||
"C": [True, None, False, None, None], | ||
} | ||
) | ||
|
||
query = 'SELECT * FROM "read_namespace"."bodo_iceberg_read_test"' | ||
check_func( | ||
impl, | ||
(bc, query), | ||
py_output=py_out, | ||
sort_output=True, | ||
reset_index=True, | ||
) | ||
|
||
|
||
def test_s3_tables_catalog_iceberg_write(s3_tables_catalog, memory_leak_check): | ||
"""tests that writing tables works""" | ||
import bodo_iceberg_connector as bic | ||
|
||
in_df = pd.DataFrame( | ||
{ | ||
"ints": list(range(100)), | ||
"floats": [float(x) for x in range(100)], | ||
"str": [str(x) for x in range(100)], | ||
"dict_str": ["abc", "df"] * 50, | ||
} | ||
) | ||
bc = bodosql.BodoSQLContext(catalog=s3_tables_catalog) | ||
bc = bc.add_or_replace_view("TABLE1", in_df) | ||
con_str = S3TablesConnectionType(s3_tables_catalog.warehouse).get_conn_str() | ||
table_name = run_rank0( | ||
lambda: gen_unique_table_id("bodosql_catalog_write_iceberg_table").upper() | ||
)().lower() | ||
|
||
def impl(bc, query): | ||
bc.sql(query) | ||
# Return an arbitrary value. This is just to enable a py_output | ||
# so we can reuse the check_func infrastructure. | ||
return 5 | ||
|
||
ctas_query = f'CREATE OR REPLACE TABLE "write_namespace"."{table_name}" AS SELECT * from __bodolocal__."TABLE1"' | ||
exception_occurred_in_test_body = False | ||
try: | ||
# Only test with only_1D=True so we only insert into the table once. | ||
check_func( | ||
impl, | ||
(bc, ctas_query), | ||
only_1D=True, | ||
py_output=5, | ||
use_table_format=True, | ||
) | ||
|
||
@bodo.jit | ||
def read_results(con_str, schema, table_name): | ||
output_df = pd.read_sql_table(table_name, con=con_str, schema=schema) | ||
return bodo.allgatherv(output_df) | ||
|
||
output_df = read_results(con_str, "write_namespace", table_name) | ||
assert_tables_equal(output_df, in_df, check_dtype=False) | ||
|
||
except Exception as e: | ||
# In the case that another exception ocurred within the body of the try, | ||
# We may not have created a table to drop. | ||
# because of this, we call delete_table in a try/except, to avoid | ||
# masking the original exception | ||
exception_occurred_in_test_body = True | ||
raise e | ||
finally: | ||
if exception_occurred_in_test_body: | ||
try: | ||
run_rank0(bic.delete_table)( | ||
bodo.io.iceberg.format_iceberg_conn(con_str), | ||
"write_namespace", | ||
table_name, | ||
) | ||
except Exception: | ||
pass | ||
else: | ||
run_rank0(bic.delete_table)( | ||
bodo.io.iceberg.format_iceberg_conn(con_str), | ||
"write_namespace", | ||
table_name, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ | |
<!-- Start Iceberg Dependencies. We may remove these as we cleanup the relationship with the Iceberg Connector. --> | ||
<iceberg.version>1.5.2</iceberg.version> | ||
<hadoop.version>3.3.3</hadoop.version> | ||
<aws.version>2.20.26</aws.version> | ||
<aws.version>2.29.26</aws.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check how these changes impact the package size? May be relevant for the next pip publication. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The wheel is 3MB bigger since the last release, no cause for concern |
||
<janino.version>3.1.10</janino.version> | ||
<!-- End Iceberg Dependencies --> | ||
</properties> | ||
|
@@ -220,6 +220,18 @@ | |
<artifactId>s3</artifactId> | ||
<version>${aws.version}</version> | ||
</dependency> | ||
<!-- S3Tables Dependencies --> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>s3tables</artifactId> | ||
<version>${aws.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.s3tables</groupId> | ||
<artifactId>s3-tables-catalog-for-iceberg</artifactId> | ||
<version>0.1.3</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>testcontainers</artifactId> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to list a docstring for what must exist and where at the top. In case we ever do an account cleanup/deletion. Also if there are creation steps including those would be great.