Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended QPT to athena-Synapse #1831

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Extended QPT to athena-synapse (#1805)
  • Loading branch information
Jithendar12 authored and AbdulR3hman committed Mar 25, 2024
commit 7faeff8eaf209f6fc36e8b663beda1cb890db898
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class SynapseConstants
public static final String DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
public static final int DEFAULT_PORT = 1433;
public static final String QUOTE_CHARACTER = "\"";
public static final String SQL_POOL = "azureServerless";

private SynapseConstants() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*-
* #%L
* athena-synapse
* %%
* Copyright (C) 2019 - 2024 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.synapse;

import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.util.HashMap;
import java.util.Map;

public enum SynapseDataType {
BIT("bit", Types.MinorType.TINYINT.getType()),
TINYINT("tinyint", Types.MinorType.SMALLINT.getType()),
NUMERIC("numeric", Types.MinorType.FLOAT8.getType()),
SMALLMONEY("smallmoney", Types.MinorType.FLOAT8.getType()),
DATE("date", Types.MinorType.DATEDAY.getType()),
DATETIME("datetime", Types.MinorType.DATEMILLI.getType()),
DATETIME2("datetime2", Types.MinorType.DATEMILLI.getType()),
SMALLDATETIME("smalldatetime", Types.MinorType.DATEMILLI.getType()),
DATETIMEOFFSET("datetimeoffset", Types.MinorType.DATEMILLI.getType());

private static final Map<String, SynapseDataType> SYNAPSE_DATA_TYPE_MAP = new HashMap<>();

static {
for (SynapseDataType next : values()) {
SYNAPSE_DATA_TYPE_MAP.put(next.synapseType, next);
}
}

private String synapseType;
private ArrowType arrowType;

SynapseDataType(String synapseType, ArrowType arrowType)
{
this.synapseType = synapseType;
this.arrowType = arrowType;
}

public static ArrowType fromType(String synapseType)
{
SynapseDataType result = SYNAPSE_DATA_TYPE_MAP.get(synapseType);
return result.arrowType;
}

public static boolean isSupported(String dataType)
{
for (SynapseDataType synapseDataType : values()) {
if (synapseDataType.name().equalsIgnoreCase(dataType)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -68,9 +69,11 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -138,6 +141,8 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

capabilities.put(jdbcQueryPassthrough.getFunctionSignature(), jdbcQueryPassthrough.getQueryPassthroughCapabilities());

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -254,6 +259,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Set<Split> splits = new HashSet<>();
Expand Down Expand Up @@ -320,6 +329,82 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
}
}

@Override
public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAllocator, final GetTableRequest getTableRequest)
throws Exception
{
if (!getTableRequest.isQueryPassthrough()) {
throw new IllegalArgumentException("No Query passed through [{}]" + getTableRequest);
}

jdbcQueryPassthrough.verify(getTableRequest.getQueryPassthroughArguments());
String customerPassedQuery = getTableRequest.getQueryPassthroughArguments().get(JdbcQueryPassthrough.QUERY);

try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) {
PreparedStatement preparedStatement = connection.prepareStatement(customerPassedQuery);
ResultSetMetaData metadata = preparedStatement.getMetaData();
if (metadata == null) {
throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + customerPassedQuery);
}
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
if (SynapseConstants.SQL_POOL.equals(SynapseUtil.checkEnvironment(connection.getMetaData().getURL()))) {
HashMap<String, List<String>> columnNameAndDataTypeMap = new HashMap<>();
for (int columnIndex = 1; columnIndex <= metadata.getColumnCount(); columnIndex++) {
List<String> columnDetails = com.google.common.collect.ImmutableList.of(
metadata.getColumnTypeName(columnIndex),
String.valueOf(metadata.getPrecision(columnIndex)),
String.valueOf(metadata.getScale(columnIndex)));
columnNameAndDataTypeMap.put(metadata.getColumnName(columnIndex), columnDetails);
}
schemaBuilder = doDataTypeConversion(columnNameAndDataTypeMap);
}

else {
for (int columnIndex = 1; columnIndex <= metadata.getColumnCount(); columnIndex++) {
String columnName = metadata.getColumnName(columnIndex);
String columnLabel = metadata.getColumnLabel(columnIndex);
//todo; is there a mechanism to pass both back to the engine?
columnName = columnName.equals(columnLabel) ? columnName : columnLabel;

int precision = metadata.getPrecision(columnIndex);
int scale = metadata.getScale(columnIndex);

ArrowType columnType = JdbcArrowTypeConverter.toArrowType(
metadata.getColumnType(columnIndex),
precision,
scale,
configOptions);
String dataType = metadata.getColumnTypeName(columnIndex);

if (dataType != null && SynapseDataType.isSupported(dataType)) {
columnType = SynapseDataType.fromType(dataType);
}

if (columnType != null && SupportedTypes.isSupported(columnType)) {
if (columnType instanceof ArrowType.List) {
schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName(
metadata.getTableName(columnIndex),
metadata.getColumnDisplaySize(columnIndex),
precision));
}
else {
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, columnType).build());
}
}
else {
// Default to VARCHAR ArrowType
LOGGER.warn("getSchema: Unable to map type for column[" + columnName +
"] to a supported type, attempted " + columnType + " - defaulting type to VARCHAR.");
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, new ArrowType.Utf8()).build());
}
}
}

Schema schema = schemaBuilder.build();
return new GetTableResponse(getTableRequest.getCatalogName(), getTableRequest.getTableName(), schema, Collections.emptySet());
}
}

/**
* Appropriate datatype to arrow type conversions will be done by fetching data types of columns
* @param jdbcConnection
Expand Down Expand Up @@ -358,10 +443,10 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
}
}

if ("azureServerless".equalsIgnoreCase(SynapseUtil.checkEnvironment(jdbcConnection.getMetaData().getURL()))) {
if (SynapseConstants.SQL_POOL.equalsIgnoreCase(SynapseUtil.checkEnvironment(jdbcConnection.getMetaData().getURL()))) {
// getColumns() method from SQL Server driver is causing an exception in case of Azure Serverless environment.
// so doing explicit data type conversion
schemaBuilder = doDataTypeConversion(columnNameAndDataTypeMap, tableName.getSchemaName());
schemaBuilder = doDataTypeConversion(columnNameAndDataTypeMap);
}
else {
schemaBuilder = doDataTypeConversionForNonCompatible(jdbcConnection, tableName, columnNameAndDataTypeMap);
Expand All @@ -371,7 +456,7 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
return schemaBuilder.build();
}

private SchemaBuilder doDataTypeConversion(HashMap<String, List<String>> columnNameAndDataTypeMap, String schemaName)
private SchemaBuilder doDataTypeConversion(HashMap<String, List<String>> columnNameAndDataTypeMap)
{
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();

Expand Down Expand Up @@ -451,48 +536,14 @@ private SchemaBuilder doDataTypeConversionForNonCompatible(Connection jdbcConnec
LOGGER.debug("columnName: " + columnName);
LOGGER.debug("dataType: " + dataType);

/**
* Converting date data type into DATEDAY since framework is unable to do it by default
*/
if ("date".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEDAY.getType();
}
/**
* Converting bit data type into TINYINT because BIT type is showing 0 as false and 1 as true.
* we can avoid it by changing to TINYINT.
*/
if ("bit".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.TINYINT.getType();
}
/**
* Converting tinyint data type into SMALLINT.
* TINYINT range is 0 to 255 in SQL Server, usage of TINYINT(ArrowType) leads to data loss
* as its using 1 bit as signed flag.
*/
if ("tinyint".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.SMALLINT.getType();
}
/**
* Converting numeric, smallmoney data types into FLOAT8 to avoid data loss
* (ex: 123.45 is shown as 123 (loosing its scale))
*/
if ("numeric".equalsIgnoreCase(dataType) || "smallmoney".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.FLOAT8.getType();
}
/**
* Converting time data type(s) into DATEMILLI since framework is unable to map it by default
*/
if ("datetime".equalsIgnoreCase(dataType) || "datetime2".equalsIgnoreCase(dataType)
|| "smalldatetime".equalsIgnoreCase(dataType) || "datetimeoffset".equalsIgnoreCase(dataType)) {
columnType = Types.MinorType.DATEMILLI.getType();
if (dataType != null && SynapseDataType.isSupported(dataType)) {
columnType = SynapseDataType.fromType(dataType);
}

/**
* converting into VARCHAR for non supported data types.
*/
if (columnType == null) {
columnType = Types.MinorType.VARCHAR.getType();
}
if (columnType != null && !SupportedTypes.isSupported(columnType)) {
if ((columnType == null) || !SupportedTypes.isSupported(columnType)) {
AbdulR3hman marked this conversation as resolved.
Show resolved Hide resolved
columnType = Types.MinorType.VARCHAR.getType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ SynapseMetadataHandler.JDBC_PROPERTIES, new DatabaseConnectionInfo(SynapseConsta
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(),
schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down Expand Up @@ -129,7 +137,7 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea
com.microsoft.sqlserver.jdbc.SQLServerException: '@@TRANCOUNT' is not supported.
So we are evading this connection.commit(), in case of Azure serverless environment.
*/
if (!"azureServerless".equals(SynapseUtil.checkEnvironment(connection.getMetaData().getURL()))) {
if (!SynapseConstants.SQL_POOL.equalsIgnoreCase(SynapseUtil.checkEnvironment(connection.getMetaData().getURL()))) {
connection.commit();
}
}
Expand Down