Skip to content

Commit

Permalink
Support PG array dimensionality
Browse files Browse the repository at this point in the history
Add array support to postgres reader/converter with dimensionality read
from `pg_attribute.attndims`. This is grabbed by joining on
`information_schema.columns.column_name = pg_attribute.attname`

This a followup to #410 which
lays a tiny bit of groundwork for
#264 (comment)
  • Loading branch information
alexdemeo committed Jan 3, 2024
1 parent 049ef45 commit 16754f0
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 37 deletions.
38 changes: 36 additions & 2 deletions recap/clients/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from recap.clients.dbapi import Connection, DbapiClient
from recap.converters.postgresql import PostgresqlConverter
from recap.types import StructType

PSYCOPG2_CONNECT_ARGS = {
"host",
Expand Down Expand Up @@ -48,8 +49,12 @@


class PostgresqlClient(DbapiClient):
def __init__(self, connection: Connection) -> None:
super().__init__(connection, PostgresqlConverter())
def __init__(
self,
connection: Connection,
converter: PostgresqlConverter = PostgresqlConverter(),
) -> None:
super().__init__(connection, converter)

@staticmethod
@contextmanager
Expand Down Expand Up @@ -78,3 +83,32 @@ def ls_catalogs(self) -> list[str]:
"""
)
return [row[0] for row in cursor.fetchall()]

def schema(self, catalog: str, schema: str, table: str) -> StructType:
cursor = self.connection.cursor()
cursor.execute(
f"""
SELECT
information_schema.columns.*,
pg_attribute.attndims
FROM information_schema.columns
JOIN pg_catalog.pg_namespace
ON pg_catalog.pg_namespace.nspname = information_schema.columns.table_schema
JOIN pg_catalog.pg_class
ON pg_catalog.pg_class.relname = information_schema.columns.table_name
AND pg_catalog.pg_class.relnamespace = pg_catalog.pg_namespace.oid
JOIN pg_catalog.pg_attribute
ON pg_catalog.pg_attribute.attrelid = pg_catalog.pg_class.oid
AND pg_catalog.pg_attribute.attname = information_schema.columns.column_name
WHERE table_name = {self.param_style}
AND table_schema = {self.param_style}
AND table_catalog = {self.param_style}
ORDER BY ordinal_position ASC
""",
(table, schema, catalog),
)
names = [name[0].upper() for name in cursor.description]
return self.converter.to_recap(
# Make each row be a dict with the column names as keys
[dict(zip(names, row)) for row in cursor.fetchall()]
)
67 changes: 46 additions & 21 deletions recap/converters/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
FloatType,
IntType,
ListType,
NullType,
ProxyType,
RecapType,
RecapTypeRegistry,
Expand All @@ -24,7 +25,15 @@


class PostgresqlConverter(DbapiConverter):
def __init__(self, namespace: str = DEFAULT_NAMESPACE) -> None:
def __init__(
self,
enforce_array_dimensions: bool = False,
namespace: str = DEFAULT_NAMESPACE,
):
# since array dimensionality is not enforced by PG schemas:
# if `enforce_array_dimensions = False` then read arrays irrespective of how many dimensions they have
# if `enforce_array_dimensions = True` then read arrays as nested lists
self.enforce_array_dimensions = enforce_array_dimensions
self.namespace = namespace
self.registry = RecapTypeRegistry()

Expand All @@ -34,6 +43,7 @@ def _parse_type(self, column_props: dict[str, Any]) -> RecapType:
octet_length = column_props["CHARACTER_OCTET_LENGTH"]
max_length = column_props["CHARACTER_MAXIMUM_LENGTH"]
udt_name = (column_props["UDT_NAME"] or "").lower()
ndims = column_props["ATTNDIMS"]

if data_type in ["bigint", "int8", "bigserial", "serial8"]:
base_type = IntType(bits=64, signed=True)
Expand Down Expand Up @@ -102,29 +112,44 @@ def _parse_type(self, column_props: dict[str, Any]) -> RecapType:
# * 8 because bit columns use bits not bytes.
"CHARACTER_MAXIMUM_LENGTH": MAX_FIELD_SIZE * 8,
"UDT_NAME": None,
"ATTNDIMS": 0,
}
)
column_name_without_periods = column_name.replace(".", "_")
base_type_alias = f"{self.namespace}.{column_name_without_periods}"
# Construct a self-referencing list comprised of the array's value
# type and a proxy to the list itself. This allows arrays to be an
# arbitrary number of dimensions, which is how PostgreSQL treats
# lists. See https://github.com/recap-build/recap/issues/264 for
# more details.
base_type = ListType(
alias=base_type_alias,
values=UnionType(
types=[
value_type,
ProxyType(
alias=base_type_alias,
registry=self.registry,
),
],
),
)
self.registry.register_alias(base_type)
if self.enforce_array_dimensions:
base_type = self._create_n_dimension_list(value_type, ndims)
else:
column_name_without_periods = column_name.replace(".", "_")
base_type_alias = f"{self.namespace}.{column_name_without_periods}"
# Construct a self-referencing list comprised of the array's value
# type and a proxy to the list itself. This allows arrays to be an
# arbitrary number of dimensions, which is how PostgreSQL treats
# lists. See https://github.com/recap-build/recap/issues/264 for
# more details.
base_type = ListType(
alias=base_type_alias,
values=UnionType(
types=[
value_type,
ProxyType(
alias=base_type_alias,
registry=self.registry,
),
],
),
)
self.registry.register_alias(base_type)
else:
raise ValueError(f"Unknown data type: {data_type}")

return base_type

def _create_n_dimension_list(self, base_type: RecapType, ndims: int) -> RecapType:
"""
Build a list type with `ndims` dimensions containing nullable `base_type` as the innermost value type.
"""
if ndims == 0:
return UnionType(types=[NullType(), base_type])
else:
return ListType(
values=self._create_n_dimension_list(base_type, ndims - 1),
)
Loading

0 comments on commit 16754f0

Please sign in to comment.