Skip to content
This repository has been archived by the owner on Aug 13, 2021. It is now read-only.

Commit

Permalink
[266] Refactor and simplify ES configuration (#275)
Browse files Browse the repository at this point in the history
* make sure conf dir is empty

* simplified es config

* added orm es config reader

* modified setup_es to pick up new es config

* swapped es_mode for boolean

* aliases now consistent with config

* aliases now automatically located

* added endpoint field to estasks

* added endpoint field to sql2estasks

* [267] Pool ES mappings across datasets (#280)

* changed branch name

* mappings build

* updated docs

* updated docs

* updated docs

* added docstrings

* added dynamic strict to settings

* removed index.json in favour of a single defaults file

* using soft alias until a future PR to minimise changes

* cleaned and sorted json

* [267] Tidy & slim schema transformations (#281)

* pruned deprecated schema transformations

* updated fos fieldname on arxlive

* unified data set schema transformations

* restructured directory

* refactored references to schema_transformation

* refactored references to schema_transformation

* slimmed down transformations, and included entity_type

* pruned ontology

* tidied schemas

* consistency tests

* reverted unrelated json file

* harmonised name fieldsofstudy across arxiv

* added novelty back in

* sorted json

* sorted json

* sorted json

Co-authored-by: Joel Klinger <[email protected]>

Co-authored-by: Joel Klinger <[email protected]>

* patched out es config setup from tests

* removed redundant tests

* fixed json formatting

* none included for testing

* picked up bug in test

Co-authored-by: Joel Klinger <[email protected]>
  • Loading branch information
jaklinger and Joel Klinger committed Jun 9, 2020
1 parent ddbb401 commit f98955a
Show file tree
Hide file tree
Showing 84 changed files with 1,238 additions and 2,215 deletions.
1 change: 1 addition & 0 deletions docs/source/nesta.core.schemas.rst
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.. include:: ../../nesta/core/schemas/README.rst
.. include:: ../../nesta/core/schemas/tier_1/mappings/README.rst
3 changes: 0 additions & 3 deletions docs/source/nesta.core.scripts.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
Scripts
=======

.. include:: ../../nesta/core/scripts/README.rst
16 changes: 6 additions & 10 deletions nesta/core/batchables/arxiv/arxiv_elasticsearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from datetime import datetime as dt

from nesta.core.orms.orm_utils import db_session, get_mysql_engine
from nesta.core.orms.orm_utils import load_json_from_pathstub
from nesta.core.orms.orm_utils import object_to_dict
from nesta.core.orms.arxiv_orm import Article
from nesta.core.orms.grid_orm import Institute
Expand Down Expand Up @@ -76,10 +75,7 @@ def run():
ngrammer = Ngrammer(database="production")

# es setup
strans_kwargs={'filename':'arxiv.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':['id']}
strans_kwargs={'filename':'arxiv.json', 'ignore':['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down Expand Up @@ -164,9 +160,9 @@ def run():
countries = set(grid_countries[inst_id]
for inst_id in good_institutes
if inst_id in grid_countries)
row['categories'], _, _ = hierarchy_field(cats)
row['fos'], _, _ = hierarchy_field(fos)
row['countries'], _, _ = hierarchy_field(countries)
row['nested_categories'], _, _ = hierarchy_field(cats)
row['fields_of_study'], _, _ = hierarchy_field(fos)
row['nested_location'], _, _ = hierarchy_field(countries)

# Pull out international institute info
has_mn = any(is_multinational(inst,
Expand Down Expand Up @@ -216,8 +212,8 @@ def run():

if 'BATCHPAR_outinfo' not in os.environ:
from nesta.core.orms.orm_utils import setup_es
es, es_config = setup_es('dev', True, True,
dataset='arxiv')
es, es_config = setup_es(endpoint='arxlive', dataset='arxiv',
production=False, drop_and_recreate=True)
environ = {'batch_file': ('ArxivESTask-2019-09-19-'
'False-1568888970724721.json'),
'config': ('/home/ec2-user/nesta-eu/nesta/'
Expand Down
14 changes: 5 additions & 9 deletions nesta/core/batchables/crunchbase/crunchbase_elasticsearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ def run():
continent_lookup[None] = None

# es setup
field_null_mapping = load_json_from_pathstub("tier_1/field_null_mappings/",
"health_scanner.json")
strans_kwargs={'filename':'crunchbase_organisation_members.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':['id']}
field_null_mapping = load_json_from_pathstub("health-scanner", "nulls.json")
strans_kwargs = {'filename': 'companies.json', 'ignore': ['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down Expand Up @@ -162,9 +158,9 @@ def run():

if 'BATCHPAR_outinfo' not in os.environ:
from nesta.core.orms.orm_utils import setup_es
es, es_config = setup_es('dev', True, True,
dataset='crunchbase',
aliases='health_scanner')
es, es_config = setup_es(production=False, endpoint='health-scanner',
dataset='companies',
drop_and_recreate=True)

environ = {"AWSBATCHTEST": "",
'BATCHPAR_batch_file': 'crunchbase_to_es-15597291977144725.json',
Expand Down
8 changes: 3 additions & 5 deletions nesta/core/batchables/eurito/arxiv_eu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ def run():

# es setup
logging.info('Connecting to ES')
strans_kwargs={'filename':'eurito/arxiv-eu.json',
'from_key':'tier_0', 'to_key':'tier_1',
'ignore':['id']}
strans_kwargs = {'filename': 'arxiv.json', 'ignore': ['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down Expand Up @@ -202,8 +200,8 @@ def run():
set_log_level()
if 'BATCHPAR_outinfo' not in os.environ:
from nesta.core.orms.orm_utils import setup_es
es, es_config = setup_es('dev', True, True,
dataset='arxiv-eu')
es, es_config = setup_es(production=False, endpoint='eurito',
dataset='arxiv', drop_and_recreate=True)
environ = {'config': ('/home/ec2-user/nesta-eu/nesta/'
'core/config/mysqldb.config'),
'batch_file' : ('arxiv-eu_EURITO-ElasticsearchTask-'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ def run():
eu_countries = get_eu_countries()

# es setup
strans_kwargs={'filename':'eurito/crunchbase-eu.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':['id']}
strans_kwargs = {'filename': 'companies.json', 'ignore': ['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down
8 changes: 3 additions & 5 deletions nesta/core/batchables/eurito/cordis_eu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ def run():

# es setup
logging.info('Connecting to ES')
strans_kwargs={'filename':'eurito/cordis-eu.json',
'from_key':'tier_0', 'to_key':'tier_1',
'ignore':['id']}
strans_kwargs = {'filename': 'cordis.json', 'ignore': ['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down Expand Up @@ -132,8 +130,8 @@ def run():
if 'BATCHPAR_outinfo' not in os.environ:
from nesta.core.orms.orm_utils import setup_es
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub
es, es_config = setup_es('dev', True, True,
dataset='cordis-eu')
es, es_config = setup_es(production=False, endpoint='eurito',
dataset='cordis', drop_and_recreate=True)
environ = {'config': find_filepath_from_pathstub('mysqldb.config'),
'batch_file' : ('cordis-eu_EURITO-ElasticsearchTask-'
'2020-04-10-True-15865345336407135.json'),
Expand Down
128 changes: 0 additions & 128 deletions nesta/core/batchables/eurito/patstat-eu/run.py

This file was deleted.

4 changes: 1 addition & 3 deletions nesta/core/batchables/eurito/patstat_eu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def run():

# es setup
logging.info('Connecting to ES')
strans_kwargs={'filename':'eurito/patstat-eu.json',
'from_key':'tier_0', 'to_key':'tier_1',
'ignore':['id']}
strans_kwargs = {'filename': 'patstat.json', 'ignore': ['id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ def run():
dupes = format_duplicate_map(dupes)

# Set up elastic search connection
field_null_mapping = load_json_from_pathstub("tier_1/"
"field_null_mappings/",
"health_scanner.json")
field_null_mapping = load_json_from_pathstub("health-scanner", "nulls.json")
es = ElasticsearchPlus(hosts=es_config['host'],
port=es_config['port'],
aws_auth_region=es_config['region'],
Expand Down
4 changes: 1 addition & 3 deletions nesta/core/batchables/health_data/nih_dedupe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def run():
art_ids = json.loads(ids_obj.get()['Body']._raw_stream.read())
logging.info(f'Processing {len(art_ids)} article ids')

field_null_mapping = load_json_from_pathstub(("tier_1/"
"field_null_mappings/"),
"health_scanner.json")
field_null_mapping = load_json_from_pathstub("health-scanner", "nulls.json")
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down
14 changes: 4 additions & 10 deletions nesta/core/batchables/health_data/nih_process_data/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
def run():
start_index = os.environ["BATCHPAR_start_index"]
end_index = os.environ["BATCHPAR_end_index"]
#mysqldb_config = os.environ["BATCHPAR_config"]
es_host = os.environ["BATCHPAR_outinfo"]
es_port = os.environ["BATCHPAR_out_port"]
es_index = os.environ["BATCHPAR_out_index"]
Expand Down Expand Up @@ -87,13 +86,8 @@ def run():
df['total_cost_currency'] = 'USD'

# output to elasticsearch
field_null_mapping = load_json_from_pathstub("tier_1/field_null_mappings/",
"health_scanner.json")
strans_kwargs={'filename':'nih.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':['application_id']}

field_null_mapping = load_json_from_pathstub("health-scanner", "nulls.json")
strans_kwargs = {'filename': 'nih.json', 'ignore': ['application_id']}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down Expand Up @@ -143,15 +137,15 @@ def run():
pars = {'start_index': '2001360',
'end_index': '2003940',
'db': 'dev',
'done': 'False',
'config': (f'{os.environ["HOME"]}/nesta/nesta/'
'core/config/mysqldb.config'),
'done': 'False',
'outinfo': ('https://search-health-scanner-'
'5cs7g52446h7qscocqmiky5dn4.'
'eu-west-2.es.amazonaws.com'),
'out_index': 'nih_dev',
'out_type': '_doc',
'out_port': '_doc',
'out_port': '_443',
'aws_auth_region': 'eu-west-2',
'entity_type': 'paper',
'test': 'False'}
Expand Down
9 changes: 2 additions & 7 deletions nesta/core/batchables/meetup/topic_tag_elasticsearch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ def run():
mesh_terms = format_mesh_terms(df_mesh)

# Setup ES+
field_null_mapping = load_json_from_pathstub(("tier_1/"
"field_null_mappings/"),
"health_scanner.json")
strans_kwargs={'filename':'meetup.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':[]}
field_null_mapping = load_json_from_pathstub("health-scanner", "nulls.json")
strans_kwargs = {'filename': 'meetup.json'}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
Expand Down
Binary file removed nesta/core/config/elasticsearch.config
Binary file not shown.
Binary file added nesta/core/config/elasticsearch.yaml
Binary file not shown.
8 changes: 5 additions & 3 deletions nesta/core/luigihacks/estask.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ElasticsearchTask(AutoBatchTask):
Args:
routine_id (str): Label for this routine.
db_config_path (str): Database config path.
endpoint (str): AWS domain name of the ES endpoint.
dataset (str): Name of the ES dataset.
entity_type (str): Entity type, for :obj:`ElasticsearchPlus`.
kwargs (dict): Any extra parameters to pass to the batchables.
Expand All @@ -27,6 +28,7 @@ class ElasticsearchTask(AutoBatchTask):
'''
routine_id = luigi.Parameter()
db_config_path = luigi.Parameter('mysqldb.config')
endpoint = luigi.Parameter()
dataset = luigi.Parameter()
entity_type = luigi.Parameter()
kwargs = luigi.DictParameter(default={})
Expand Down Expand Up @@ -72,10 +74,10 @@ def prepare(self):
" while in test mode")

# Setup elasticsearch and extract all ids
es_mode = 'dev' if self.test else 'prod'
es, es_config = setup_es(es_mode, self.test,
drop_and_recreate=False,
es, es_config = setup_es(endpoint=self.endpoint,
dataset=self.dataset,
production=not self.test,
drop_and_recreate=False,
increment_version=False)
ids = get_es_ids(es, es_config, size=10000) # All ids in this index
ids = ids - self._done_ids # Don't repeat done ids
Expand Down
Loading

0 comments on commit f98955a

Please sign in to comment.