Skip to content

Commit

Permalink
Add try
Browse files Browse the repository at this point in the history
  • Loading branch information
capyvara committed Nov 23, 2022
1 parent fdf6239 commit b607085
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions tse/utils/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tse.utils import log_progress
from tse.parsers import CityConfigParser

from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch, TransportError
from elasticsearch.helpers import bulk

pd.options.mode.string_storage = "pyarrow"
Expand Down Expand Up @@ -174,8 +174,12 @@ def gen_docs():
})
yield doc

bulk(client=es_client, actions=gen_docs(), chunk_size=500)
logger.info("%s | Finished %s", worker_name(), log_filename)
try:
bulk(client=es_client, actions=gen_docs(), chunk_size=1000)
logger.info("%s | Finished %s", worker_name(), log_filename)
except TransportError as ex:
logger.warning("%s | Transport error on %s: %s", worker_name(), log_filename, repr(ex))
pass

return partition

Expand Down Expand Up @@ -318,7 +322,7 @@ def main():
create_voting_machine_logs_index(es_client)
create_voting_machine_logfiles_index(es_client)

with Client(LocalCluster(n_workers=12, threads_per_worker=1, silence_logs=False)) as client:
with Client(LocalCluster(n_workers=16, threads_per_worker=1, silence_logs=False)) as client:
#with Client(LocalCluster(processes=False, threads_per_worker=1, silence_logs=False)) as client:
logging.info("Init client: %s, dashboard: %s", client, client.dashboard_link)
#dask.config.set(scheduler='single-threaded') # Debug
Expand Down

0 comments on commit b607085

Please sign in to comment.