Skip to content

Commit

Permalink
ETL_0.0.1
Browse files Browse the repository at this point in the history
fix bugs in organizer.py
  • Loading branch information
mbsuraj committed May 26, 2021
1 parent 6bef208 commit 33a0a73
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def main():
# clean null data
# define schema
integer_columns = []
short_columns = ['Days']
short_columns = ['Days', 'PAY_DAY_SUPPLY_CNT', 'PAYABLE_QTY', 'pay_day_supply_count',
'charge_amount', 'rx_cost', 'net_paid_amount', 'member_responsible_amount']
string_columns = [c for c in humana_df.columns if c not in integer_columns + short_columns]
cast_dict = {
IntegerType: integer_columns,
Expand Down
29 changes: 15 additions & 14 deletions spark_pipelines/pyhumana/pyhumana_clean_pipeline/organizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ def organize(self, save_location=None):
assert save_location is not None, "Please set NullCleaner parameter 'save' to False if saving is not needed"
else:
assert save_location is None, "Please set NullCleaner parameter 'save' to True if saving is needed"
all_non_event_attributes = [x for x in self._df.columns if x.find('attr') == -1]

all_event_attributes = self._df_rules.values.flatten()
all_event_distinct_attributes = [x for x in set(all_event_attributes) if x is not np.nan]
event_category_attribute_relation = self._df_rules.to_dict('index')

all_attributes = ['id', 'event_descr', 'event_category', 'call_category', 'inquiry_reason_description',
'disposition_description', 'origin', 'location', 'place_of_treatment', 'drug_group_id',
'gpi_drug_class_description', 'generic_name', 'rx_cost', 'brand_name', 'net_paid_amount',
'cob', 'ndc_id', 'status_code', 'diagnosis', 'gpi_drug_group6_id', 'gpi_drug_group8_id',
'charge_amount', 'pay_day_supply_count', 'member_responsible_amount', 'claim_tier', 'drug_group_description',
'Days', 'PAY_DAY_SUPPLY_CNT', 'PAYABLE_QTY', 'MME', 'DRUG_TYPE', 'Specialty', 'Specialty2', 'Specialty3']
organized_df = None
for event_category in event_category_attribute_relation.keys():
temp = self._df.filter(self._df.event_category == event_category)
Expand All @@ -80,18 +85,14 @@ def organize(self, save_location=None):
temp = temp.withColumn(new_col, lit(None))

# arranging into a sequence of columns
all_attributes = all_non_event_attributes[0:2] + \
all_non_event_attributes[-1:] + \
[cat_attr for cat_attr in event_category_attribute_relation[event_category].values() if
cat_attr is not np.nan] + \
new_untouched_columns + \
all_non_event_attributes[2:-1]
temp = temp.select(*all_attributes)
if organized_df is None:
organized_df = temp.select(*all_attributes)
organized_df = temp
else:
organized_df = organized_df.union(temp)
organized_df = stp.drop_null_columns(organized_df)
if self._save:
organized_df.write.parquet(save_location)
else:
return organized_df

#organized_df = stp.drop_null_columns(organized_df)
if self._save:
organized_df.write.parquet(save_location)
else:
return organized_df

0 comments on commit 33a0a73

Please sign in to comment.