Skip to content
This repository has been archived by the owner on Aug 13, 2021. It is now read-only.

Commit

Permalink
Getting elasticsearch batchable up and running
Browse files Browse the repository at this point in the history
  • Loading branch information
Joel Klinger committed Oct 26, 2018
1 parent 605acd9 commit 166795d
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 73 deletions.
2 changes: 1 addition & 1 deletion docs/source/nesta.production.elasticsearch.rst
Original file line number Diff line number Diff line change
@@ -1 +1 @@
.. include:: ../../nesta/production/elasticsearch/README.rst
.. include:: ../../nesta/production/README-elasticsearch.rst
3 changes: 0 additions & 3 deletions make_requirements.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#!/bin/bash

pipreqs --force --ignore docs/ .
pip freeze | grep "PyMySQL" >> requirements.txt
pip freeze | grep "elasticsearch" >> requirements.txt

54 changes: 54 additions & 0 deletions nesta/production/README-elasticsearch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
Elasticsearch
=============

The following steps will take you through setting up elasticsearch on an EC2
instance.

Launch the EC2 instance and ssh in so the following can be installed:

docker
------
:code:`sudo yum install docker -y`

docker-compose
--------------
``curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s` - `uname -m` -o /usr/local/bin/docker-compose``
:code:`chmod +x /usr/local/bin/docker-compose`
:code:`sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose'
more info: https://github.com/docker/compose/releases
docker permissions
------------------
:code:`sudo usermod -a -G docker $USER`

more info: https://techoverflow.net/2017/03/01/solving-docker-permission-denied-while-trying-to-connect-to-the-docker-daemon-socket/

vm.max_map_count
----------------
set permanantly in */etc/sysctl.conf* by adding the following line:
:code:`vm.max_map_count=262144`

more info: https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

python 3.6
----------
:code:`sudo yum install python36 -y`

*The machine now needs to be rebooted*
:code:`sudo reboot now`

Docker
------
- max file descriptors
- docker-compose

Reindexing data from a remote cluster
-------------------------------------
- reindex permissions need to be set in the new cluster's *elasticsearch.yml*
- if the existing cluster is AWS hosted ES the ip address needs to be added to
the security settings
- follow this guide: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#reindex-from-remote
- *index* and *query* do not need to be supplied
- if reindexing from AWS ES the port should be 443 for https. This is mandatory in the json sent to the reindexing api

11 changes: 5 additions & 6 deletions nesta/production/batchables/health_data/nih_process_data/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
from nesta.packages.health_data.process_nih import country_iso_code_dataframe
from nesta.packages.health_data.process_nih import geocode_dataframe
from nesta.production.orms.orm_utils import get_mysql_engine
from nesta.production.orms.world_reporter_orm import Projects
from nesta.production.orms.nih_orm import Projects


def run():
start_index = os.environ["BATCHPAR_start_index"]
end_index = os.environ["BATCHPAR_end_index"]
mysqldb_config = os.environ["BATCHPAR_config"]
#mysqldb_config = os.environ["BATCHPAR_config"]
es_host = os.environ["BATCHPAR_outinfo"]
es_port = os.environ["BATCHPAR_out_port"]
es_index = os.environ["BATCHPAR_out_index"]
es_type = os.environ["BATCHPAR_out_type"]
db = os.environ["BATCHPAR_database"]
db = os.environ["BATCHPAR_db"]

engine = get_mysql_engine(mysqldb_config, "mysqldb", db)
engine = get_mysql_engine("BATCHPAR_config", "mysqldb", db)
Session = sessionmaker(bind=engine)
session = Session()

Expand Down Expand Up @@ -58,8 +58,7 @@ def run():
df[col] = df[col].apply(_extract_date)

# apply schema
schema = 'nesta/production/schemas/tier_1/schema_transformations/nih.json'
df = schema_transformer(df, filename=schema,
df = schema_transformer(df, filename="nih.json",
from_key='tier_0', to_key='tier_1',
ignore=['application_id'])

Expand Down
41 changes: 20 additions & 21 deletions nesta/production/orms/orm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,26 @@ def get_mysql_engine(db_env, section, database="production_tests"):
return create_engine(url, connect_args={"charset":"utf8mb4"})


def get_elasticsearch_config(es_env, section):
'''Loads local configuration for elasticsearch.
Args:
es_env (str): name of the environmental variable holding the path to the config
section (str): section of the document holding the relevent configuration
Returns:
(dict): settings for elasticsearch
'''
conf_path = os.environ[es_env]
cp = ConfigParser()
cp.read(conf_path)
conf = dict(cp._sections[section])
es_config = {'host': conf['host'],
'port': conf['port'],
'index': conf['index'],
'type': conf['type']
}
return es_config
# def get_elasticsearch_config(es_env, section):
# '''Loads local configuration for elasticsearch.

# Args:
# es_env (str): name of the environmental variable holding the path to the config
# section (str): section of the document holding the relevent configuration

# Returns:
# (dict): settings for elasticsearch
# '''
# conf_path = os.environ[es_env]
# cp = ConfigParser()
# cp.read(conf_path)
# conf = dict(cp._sections[section])
# es_config = {'host': conf['host'],
# 'port': conf['port'],
# 'index': conf['index'],
# 'type': conf['type']
# }
# return es_config


def create_elasticsearch_index(index_name, es_client, config_path=None):
Expand All @@ -180,5 +180,4 @@ def create_elasticsearch_index(index_name, es_client, config_path=None):
with open(config_path) as f:
config = json.load(f)
response = es_client.indices.create(index=index_name, body=config)
print(response)
return response
6 changes: 3 additions & 3 deletions nesta/production/orms/world_reporter_es_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@
"coordinate_of_organisation": {
"type": "geo_point"
},
"id_countryIso2_organisation": {
"id_iso2_country": {
"type": "keyword"
},
"id_countryIso3_organisation": {
"id_iso3_country": {
"type": "keyword"
},
"id_countryIsoNumeric_organisation": {
"id_isoNumeric_country": {
"type": "keyword"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ class CollectTask(autobatch.AutoBatchTask):
data in the MySQL server.
Args:
date (datetime): Date used to label the outputs
date (datetime): Datetime used to label the outputs
_routine_id (str): String used to label the AWS task
db_config_path: (str) The output database configuration
'''
date = luigi.DateParameter()
_routine_id = luigi.Parameter()
db_config_path = luigi.Parameter()
production = luigi.BoolParameter()

def output(self):
'''Points to the output database engine'''
db_config = misctools.get_config(self.db_config_path, "mysqldb")
db_config["database"] = "production" if self.production else "dev"
db_config["database"] = "production" if not self.test else "dev"
db_config["table"] = "NIH <dummy>" # Note, not a real table
update_id = "NihCollectData_{}".format(self.date)
return MySqlTarget(update_id=update_id, **db_config)
Expand All @@ -64,7 +65,7 @@ def prepare(self):
params = {"table_name": table_name,
"url": url,
"config": "mysqldb.config",
"db_name": "production" if self.production else "dev",
"db_name": "production" if not self.test else "dev",
"outinfo": "s3://nesta-production-intermediate/%s" % url,
"done": done}
job_params.append(params)
Expand Down
42 changes: 27 additions & 15 deletions nesta/production/routines/health_data/nih_data/nih_process_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,54 @@
from nesta.production.routines.health_data.nih_data.nih_collect_task import CollectTask
from nesta.production.luigihacks import autobatch, misctools
from nesta.production.luigihacks.mysqldb import MySqlTarget
from nesta.production.orms.orm_utils import get_elasticsearch_config
#from nesta.production.orms.orm_utils import get_elasticsearch_config
from nesta.production.orms.orm_utils import get_mysql_engine
from nesta.production.orms.world_reporter_orm import Projects
from nesta.production.orms.nih_orm import Projects
from nesta.production.luigihacks.misctools import find_filepath_from_pathstub

BATCH_SIZE = 50000
MYSQLDB_ENV = 'MYSQLDB'
ESCONFIG_ENV = 'ESCONFIG'
#ESCONFIG_ENV = 'ESCONFIG'


class ProcessTask(autobatch.AutoBatchTask):
'''A dummy root task, which collects the database configurations
and executes the central task.
Args:
date (datetime): Date used to label the outputs
date (str): Date used to label the outputs
_routine_id (str): String used to label the AWS task
db_config_path (str): Path to the MySQL database configuration
production (bool): Flag indicating whether running in testing
mode (False, default), or production mode (True).
'''
date = luigi.DateParameter(default=datetime.date.today())
date = luigi.DateParameter()
_routine_id = luigi.Parameter()
db_config_path = luigi.Parameter()
production = luigi.BoolParameter(default=False)

def requires(self):
'''Collects the database configurations
and executes the central task.'''
logging.getLogger().setLevel(logging.INFO)
yield CollectTask(date=self.date,
_routine_id=self._routine_id,
db_config_path=self.db_config_path,
production=self.production)
batchable=find_filepath_from_pathstub("batchables/health_data/nih_collect_data"),
env_files=[find_filepath_from_pathstub("nesta/nesta"),
find_filepath_from_pathstub("/production/config/mysqldb.config")],
job_def=self.job_def,
job_name="CollectTask-%s" % self._routine_id,
job_queue=self.job_queue,
region_name=self.region_name,
poll_time=10,
test=self.test,
memory=2048,
max_live_jobs=50)

def output(self):
'''Points to the input database target'''
update_id = "worldreporter-%s" % self._routine_id
update_id = "NihProcessData-%s" % self._routine_id
db_config = misctools.get_config("mysqldb.config", "mysqldb")
db_config["database"] = "production"
db_config["table"] = "worldreporter"
db_config["database"] = "production" if not self.test else "dev"
db_config["table"] = "NIH process DUMMY" # Note, not a real table
return MySqlTarget(update_id=update_id, **db_config)

def batch_limits(self, query, batch_size):
Expand Down Expand Up @@ -93,15 +104,16 @@ def prepare(self):

# elasticsearch setup
es_mode = 'rwjf_prod' if not self.test else 'rwjf_dev'
es_config = get_elasticsearch_config(ESCONFIG_ENV, es_mode)
es = Elasticsearch(es_config['host'], port=es_config['port'], sniff_on_start=True)
es_config = misctools.get_config('elasticsearch.config', es_mode)
#es_config = get_elasticsearch_config(ESCONFIG_ENV, es_mode)
es = Elasticsearch(es_config['host'], port=es_config['port']) #, sniff_on_start=True)

batches = self.batch_limits(project_query, BATCH_SIZE)
job_params = []
for start, end in batches:
params = {'start_index': start,
'end_index': end,
'config': "mysqldb_config",
'config': "mysqldb.config",
'db': db,
'outinfo': es_config['host'],
'out_port': es_config['port'],
Expand Down
33 changes: 17 additions & 16 deletions nesta/production/routines/health_data/nih_data/nih_root_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import luigi
import datetime
import logging
from nesta.production.luigihacks.misctools import find_filepath_from_pathstub

from _nih_process_task import ProcessTask
from nih_process_task import ProcessTask

class RootTask(luigi.WrapperTask):
'''A dummy root task, which collects the database configurations
Expand All @@ -31,21 +32,21 @@ class RootTask(luigi.WrapperTask):
def requires(self):
'''Collects the database configurations
and executes the central task.'''
_routine_id = "{}-{}".format(self.date, self.production)

logging.getLogger().setLevel(logging.INFO)
yield ProcessTask(date=self.date,
_routine_id=_routine_id,
db_config_path=self.db_config_path,
production=self.production,
batchable="home/ec2user/nesta/nesta/production/"
"batchables/health_data/world_reporter_process/",
env_files=["home/ec2user/nesta/nesta/production/schemas/tier1/schema_transformations/nih.json",
"home/ec2user/nesta/nesta/production/config/mysqldb.config",
"home/ec2user/nesta/nesta/production/config/elasticsearch.config",
"home/ec2user/nesta/nesta/production/health_data/",
"home/ec2user/nesta/nesta/production/orms/",
"home/ec2user/nesta/nesta/production/decorators/"]
# job_def="py36_amzn1_image",
# job_name="GroupDetails-%s" % _routine_id,
# job_queue="HighPriority",
# region_name="eu-west-2",
# poll_time=10,
)
test=(not self.production),
batchable=find_filepath_from_pathstub("batchables/health_data/nih_process_data"),
env_files=[find_filepath_from_pathstub("nesta/nesta/"),
find_filepath_from_pathstub("config/mysqldb.config"),
find_filepath_from_pathstub("config/elasticsearch.config"),
find_filepath_from_pathstub("nih.json")],
job_def="py36_amzn1_image",
job_name="ProcessTask-%s" % _routine_id,
job_queue="HighPriority",
region_name="eu-west-2",
poll_time=10,
max_live_jobs=50)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{"tier_0": "total_cost", "tier_1": "cost_total_project"},
{"tier_0": "abstract_text", "tier_1": "textBody_abstract_project"},
{"tier_0": "coordinates", "tier_1": "coordinate_of_organisation"},
{"tier_0": "country_alpha_2", "tier_1": "id_iso2_organisation"},
{"tier_0": "country_alpha_3", "tier_1": "id_iso3_organisation"},
{"tier_0": "country_alpha_numeric", "tier_1": "id_isoNumeric_organisation"}
{"tier_0": "country_alpha_2", "tier_1": "id_iso2_country"},
{"tier_0": "country_alpha_3", "tier_1": "id_iso3_country"},
{"tier_0": "country_alpha_numeric", "tier_1": "id_isoNumeric_country"}
]
2 changes: 1 addition & 1 deletion nesta/production/schemas/tier_1/tier_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"term": "middleName",
"values": ["of", "fiscal", "city", "country", "zipcode", "state", "total",
"official", "start", "end", "descriptive", "abstract", "duns", "iso2",
"iso3", "isoNumeric"]
"iso3", "isoNumeric"]
},
{
"term": "lastName",
Expand Down

0 comments on commit 166795d

Please sign in to comment.