Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35707][Table SQL / API] Allow column definition in CREATE TABLE AS (CTAS) #24987

Merged
merged 1 commit into from
Jul 8, 2024

Conversation

spena
Copy link
Contributor

@spena spena commented Jun 27, 2024

What is the purpose of the change

Allow defining new columns and/or overriding source column types in the CREATE clause on CTAS statements.
Syntax supported:

CREATE TABLE table_name [( <column_definition>[, ...n] )]
AS SELECT query_expression;

Brief change log

  • Added support for column definition in the CREATE clause

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests to validation and converter classes
  • Manually verified the change by running a single node cluster and sql client

Quick test using a compute column to check the expression is evaluated using source columns:

Flink SQL> create table t1(id int, name string) with ('connector'='filesystem', 'format'='json', 'path'='/tmp/t1');
[INFO] Execute statement succeeded.

Flink SQL> insert into t1 values (1, 's1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: bf61390313b4e173c1afb8e60ef5c2c9

Flink SQL> create table s1_1(id_2x as id * 2) with ('connector'='filesystem', 'format'='json', 'path'='/tmp/s1_1') as select id, name from t1;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b5cf09801d323c9008fbe7a20006e83e

Flink SQL> describe s1_1;
+-------+--------+------+-----+-------------+-----------+
|  name |   type | null | key |      extras | watermark |
+-------+--------+------+-----+-------------+-----------+
| id_2x |    INT | TRUE |     | AS `id` * 2 |           |
|    id |    INT | TRUE |     |             |           |
|  name | STRING | TRUE |     |             |           |
+-------+--------+------+-----+-------------+-----------+
3 rows in set

Flink SQL> select * from s1_1;

       id_2x          id                           name
           2           1                             s1

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs (will follow-up with another PR to update docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 27, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @spena.

  • We should add at least 3 ITCase tests as well that test the CTAS end-to-end. E.g. next to org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase#testCreateTableAsSelectWithoutOptions. For only left schema, only right schema, mixture of both.
  • What is the behavior if the table column is NOT NULL and the source schema doesn't fill it.
  • It would be great if there is a smart way of reducing code duplication among MergeTableLikeUtil and MergeTableAsUtil.

}

/**
* Merges the schema part of the {@code sqlCreateTableAs} with the {@code sourceSchema}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you know that you can also use {@param sqlCreateTableAs} at random locations in JavaDocs? This way you can get full IDE support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find out how it helps. For example. the IDE pop up message shows me this formatted doc after adding the @param:

javadoc: Merges the schema part of the {@code sqlCreateTableAs} with the {@param sourceSchema}

Merges the schema part of the sqlCreateTableAs with the @param sourceSchema.

The sqlCreateTableAs is in bold in the doc, but the sourceSchema isn't.

I didn't find docs about how this is helpful. Do you have examples or a doc to read about it?

* Builder class for constructing a {@link Schema} based on the rules of the {@code CREATE TABLE
* ... AS SELECT} statement.
*/
private static class SchemaBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of code that is duplicated from MergeTableLikeUtil. Isn't it somehow possible to merge the two utils? Isn't the sourceSchema similar to the CREATE TABLE x LIKE sourceSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way they're merged are different. The LIKE util adds the sourceSchema at the beginning and the sink columns at the end. It also depends on the LIKE STRATEGIES on how primary/partition keys, watermarks are merged, either OVERWRITING, IGNORING, EXCLUDING. There's no strategy for schema columns yet.

I was tempted to re-use that util class and add the support for column overwriting (currently, the LIKE util throws if a sink column exists in the source column), but I thought I will start messing with the CREATE TABLE LIKE and its behavior. I instead decided to create this other utility to prevent changing logic of another statement.

@spena spena force-pushed the flip_463_ctas_schema_def branch from 70a3367 to bf07667 Compare July 1, 2024 22:32
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments. I think the pull request will be ready in the next iteration.

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;

/** A utility class for {@link SqlCreateTableConverter} conversions. */
public final class SqlConverterUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very generic name. How about calling it SchemaBuilderUtil or even better: let the other two extend from it. It seems there are no static methods, so why not extending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Incompatible types for sink column 'f0' at position 0. "
+ "The source column has type [INT NOT NULL], while the target "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove [ or ]. if you want to wrap args use singe quotes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@spena spena force-pushed the flip_463_ctas_schema_def branch from bf07667 to 48bf513 Compare July 3, 2024 16:17
@spena spena requested a review from twalthr July 5, 2024 20:38
@twalthr twalthr merged commit 93d7f45 into apache:master Jul 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants