-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-243] Create NamedHivePartitionSensor #1593
Conversation
@@ -306,6 +307,17 @@ def sasl_factory(): | |||
def get_conn(self): | |||
return self.metastore | |||
|
|||
def check_for_named_partition(self, schema, table, partition_name): | |||
self.metastore._oprot.trans.open() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments / doctest similar to check_for_partition()
?
Current coverage is 63.90%@@ master #1593 diff @@
==========================================
Files 121 121
Lines 8462 8512 +50
Methods 0 0
Messages 0 0
Branches 0 0
==========================================
+ Hits 5422 5440 +18
- Misses 3040 3072 +32
Partials 0 0
|
class NamedHivePartitionSensor(BaseSensorOperator): | ||
""" | ||
Waits for a partition to show up in Hive. Only accepts plain partition | ||
names (eg dt=xxx) for the partition argument and no filters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only accepts fully specified partition identifier strings of the form <DB name>.<table name>/<partition name>
e.g. default.users/ds=2016-01-01
.
@mistercrunch and I thought that adding warning messages to |
@plypaul Added stuff to the docs - what do you mean by adding a warning message to HivePartitionSensor? |
A note in the Also, a similar warning can be printed through the logger when the operator actually runs. |
@plypaul I put the docstring warning, but I don't think we should log a warning because there could be legitimate cases where |
:param partition_names: List of fully qualified names of the | ||
partitions to wait for. A fully qualified name is of the | ||
form schema.table/pk1=pv1/pk2=pv2, for example, | ||
default.users/ds=2016-0101. This is passed as is to the Metastore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: 2016-01-01, metastore is lowercase.
Nice work @zodiac! LGTM |
@@ -163,6 +163,13 @@ def test_hive_stats(self): | |||
dag=self.dag) | |||
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) | |||
|
|||
def test_named_hive_partition_sensor(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing could be a bit more comprehensive (e.g. make sure exception is thrown), confirming correct state/return value of poke after some calls etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added 2 other test to exercise multiple partition and nonexistent partition logic. The order in which things are poked, etc seems like an implementation detail and should not be baked into tests. That poking works is tested in the docstring for check_for_named_partition
. Let me know if there are other specific tests you think should be in here.
are there any numbers on the performance difference between Named and HivePartitionSensor? |
nice work @zodiac |
and apparently supports SQL like notation as in `ds='2015-01-01' | ||
AND type='value'` and > < sings as in "ds>=2015-01-01" | ||
:type partition: string | ||
:param metastore_conn_id: reference to the metastore thrift service | ||
connection id | ||
:type metastore_conn_id: str | ||
|
||
Note: Because @partition supports general logical operators, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move the note up and maybe make it a strong recommendation to use the NamedHivePartitionSensor?
Looks good to me 👍 |
Create NamedHivePartitionSensor, which calls get_partitions_by_name instead of the less efficient but more general get_partitions_by_filter
@@ -264,7 +340,6 @@ def poke(self, context): | |||
'Poking for table {self.schema}.{self.table}, ' | |||
'partition {self.partition}'.format(**locals())) | |||
if not hasattr(self, 'hook'): | |||
import airflow.hooks.hive_hooks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was the line removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think because it's not needed due to this.
Closes apache#1593 from zodiac/create-NamedHivePartitionSensor
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues:
Testing Done:
Created dag using NamedHivePartitionSensor and check that it waits for the partition as expected
@plypaul