Skip to content

Commit

Permalink
Extended QPT to athena-sqlserver (awslabs#1823)
Browse files Browse the repository at this point in the history
Co-authored-by: AbdulRehman Faraj <[email protected]>
  • Loading branch information
AbdulR3hman and AbdulRehman Faraj committed Mar 28, 2024
1 parent 68294d5 commit dec75b6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*-
* #%L
* athena-sqlserver
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.sqlserver;

import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;

public enum SqlServerDataType {
BIT(Types.MinorType.TINYINT.getType()),
TINYINT(Types.MinorType.SMALLINT.getType()),
NUMERIC(Types.MinorType.FLOAT8.getType()),
SMALLMONEY(Types.MinorType.FLOAT8.getType()),
DATE(Types.MinorType.DATEDAY.getType()),
DATETIME(Types.MinorType.DATEMILLI.getType()),
DATETIME2(Types.MinorType.DATEMILLI.getType()),
SMALLDATETIME(Types.MinorType.DATEMILLI.getType()),
DATETIMEOFFSET(Types.MinorType.DATEMILLI.getType());

private ArrowType arrowType;

SqlServerDataType(ArrowType arrowType)
{
this.arrowType = arrowType;
}

public static ArrowType fromType(String sqlServerType)
{
SqlServerDataType result = valueOf(sqlServerType.toUpperCase());
return result.arrowType;
}

public static boolean isSupported(String dataType)
{
for (SqlServerDataType sqlServerDataType : values()) {
if (sqlServerDataType.name().equalsIgnoreCase(dataType)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -203,6 +204,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions);
return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -300,6 +302,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
LOGGER.info("partitionContd: {}", partitionContd);
Expand Down Expand Up @@ -388,6 +394,18 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
}
}

@Override
protected ArrowType convertDatasourceTypeToArrow(int columnIndex, int precision, Map<String, String> configOptions, ResultSetMetaData metadata) throws SQLException
{
String dataType = metadata.getColumnTypeName(columnIndex);
LOGGER.info("In convertDatasourceTypeToArrow: converting {}", dataType);
if (dataType != null && SqlServerDataType.isSupported(dataType)) {
LOGGER.debug("Sql Server Datatype is support: {}", dataType);
return SqlServerDataType.fromType(dataType);
}
return super.convertDatasourceTypeToArrow(columnIndex, precision, configOptions, metadata);
}

/**
* Appropriate datatype to arrow type conversions will be done by fetching data types of columns
* @param jdbcConnection
Expand Down Expand Up @@ -436,48 +454,13 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
LOGGER.debug("columnName: " + columnName);
LOGGER.debug("dataType: " + dataType);

/**
* Converting date data type into DATEDAY since framework is unable to do it by default
*/
if ("date".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEDAY.getType();
}
/**
* Converting bit data type into TINYINT because BIT type is showing 0 as false and 1 as true.
* we can avoid it by changing to TINYINT.
*/
if ("bit".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.TINYINT.getType();
}
/**
* Converting tinyint data type into SMALLINT.
* TINYINT range is 0 to 255 in SQL Server, usage of TINYINT(ArrowType) leads to data loss
* as its using 1 bit as signed flag.
*/
if ("tinyint".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.SMALLINT.getType();
}
/**
* Converting numeric, smallmoney data types into FLOAT8 to avoid data loss
* (ex: 123.45 is shown as 123 (loosing its scale))
*/
if ("numeric".equalsIgnoreCase(dataType) || "smallmoney".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.FLOAT8.getType();
}
/**
* Converting time data type(s) into DATEMILLI since framework is unable to map it by default
*/
if ("datetime".equalsIgnoreCase(dataType) || "datetime2".equalsIgnoreCase(dataType)
|| "smalldatetime".equalsIgnoreCase(dataType) || "datetimeoffset".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEMILLI.getType();
if (dataType != null && SqlServerDataType.isSupported(dataType)) {
columnType = SqlServerDataType.fromType(dataType);
}
/**
* converting into VARCHAR for non supported data types.
*/
if (columnType == null) {
columnType = Types.MinorType.VARCHAR.getType();
}
if (columnType != null && !SupportedTypes.isSupported(columnType)) {
if ((columnType == null) || !SupportedTypes.isSupported(columnType)) {
columnType = Types.MinorType.VARCHAR.getType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,15 @@ public SqlServerRecordHandler(DatabaseConnectionConfig databaseConnectionConfig,
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints,
Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(),
schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(),
schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down

0 comments on commit dec75b6

Please sign in to comment.