Skip to content

Commit

Permalink
Merge pull request imWildCat#75 from imWildCat/revert-71-run_once
Browse files Browse the repository at this point in the history
Revert "Enable Running Standalone Components"
  • Loading branch information
imWildCat committed Apr 7, 2019
2 parents dd1a6d5 + 211ba2f commit bd16f69
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 90 deletions.
Empty file modified package.json
100644 → 100755
Empty file.
Empty file modified requirements.txt
100644 → 100755
Empty file.
28 changes: 7 additions & 21 deletions scylla/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ def main(args) -> int:
help='The hostname for the web server')
parser.add_argument('--skip-scheduler', action='store_true',
help='Prevent the scheduler from crawling')
parser.add_argument('--scheduler-run-once', action='store_true',
help='Run all tasks in scheduler only once')
parser.add_argument('--version', '-v', action='store_true',
help='Print the version of Scylla')
parser.add_argument('--db-path', type=str, default='./scylla.db',
Expand All @@ -48,45 +46,33 @@ def main(args) -> int:
from scylla.database import create_db_tables
from scylla.loggings import logger
from scylla.scheduler import Scheduler
from scylla.web import start_web_server_non_blocking
from scylla.web import start_web_server
from scylla.proxy import start_forward_proxy_server_non_blocking

create_db_tables()

s = Scheduler()
p_web, p_proxy = None, None

try:
# scheduler
if not get_config('skip_scheduler'):
run_once = bool(get_config('scheduler_run_once'))
logger.info('Start scheduler, run_once=%s' % run_once)
s.start(run_once)
s.start()

# forward proxy serveer
if not get_config('no_forward_proxy_server'):
logger.info('Start forward proxy server')
p_web = start_forward_proxy_server_non_blocking()
start_forward_proxy_server_non_blocking()

# web server
if not get_config('no_webserver'):
logger.info('Start web server')
p_proxy = start_web_server_non_blocking(workers=1)

# exit
if s.is_alive():
s.join()
logger.info('scheduler done.')
if p_web or p_proxy:
p_web.join()
p_proxy.join()
logger.info('Start the web server')
start_web_server(
host=parsed_args_dict['web_host'], port=parsed_args_dict['web_port'])

s.join()
except (KeyboardInterrupt, SystemExit):
logger.info('catch KeyboardInterrupt, exiting...')
s.stop()
sys.exit(0)

logger.info('scylla exiting...')
return 0


Expand Down
1 change: 0 additions & 1 deletion scylla/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,3 @@ def start_forward_proxy_server():
def start_forward_proxy_server_non_blocking():
p = Process(target=start_forward_proxy_server, daemon=True)
p.start()
return p
91 changes: 37 additions & 54 deletions scylla/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import time
import queue
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from multiprocessing import Queue, Process
Expand All @@ -17,22 +16,18 @@

FEED_FROM_DB_INTERVAL_MINUTES = 30

def fetch_ips(q: Queue, validator_queue: Queue, run_once=False):
logger.debug('worker_process started.')
logger.info('fetching ips...')

def fetch_ips(q: Queue, validator_queue: Queue):
logger.debug('fetch_ips...')
worker = Worker()

while True:
try:
if run_once and q.empty():
raise SystemExit
break

provider: BaseProvider = q.get()

provider_name = provider.__class__.__name__

logger.info('Get a provider from the provider queue: ' + provider_name)
logger.debug('Get a provider from the provider queue: ' + provider_name)

for url in provider.urls():

Expand All @@ -50,39 +45,28 @@ def fetch_ips(q: Queue, validator_queue: Queue, run_once=False):
)
except (KeyboardInterrupt, InterruptedError, SystemExit):
worker.stop()
logger.info('worker_process exited.')
break
except pyppeteer.errors.PyppeteerError as e:
logger.error("""pyppeteer.errors.PyppeteerError detected: %s\n
logger.debug("""pyppeteer.errors.PyppeteerError detected: %s\n
'Please make sure you have installed all the dependencies for chromium correctly""", e)
break

logger.debug('worker_process exited.')

def validate_ips(validator_queue: Queue, validator_pool: ThreadPoolExecutor, run_once=False):
logger.debug('validator_thread started.')

def validate_ips(validator_queue: Queue, validator_pool: ThreadPoolExecutor):
while True:
try:
## wait 5 mins for next proxy ip in run once mode
proxy: ProxyIP = validator_queue.get(timeout=300 if run_once else None)
proxy: ProxyIP = validator_queue.get()

validator_pool.submit(validate_proxy_ip, p=proxy)
except (KeyboardInterrupt, SystemExit):
break
except queue.Empty:
logger.debug('validator_thread has timed out.')
break

logger.debug('validator_thread exited.')

validator_pool.shutdown(wait=True)
logger.debug('validator_pool exited.')


def cron_schedule(scheduler, run_once=False):
def cron_schedule(scheduler, only_once=False):
"""
:param scheduler: the Scheduler instance
:param run_once: flag for testing
:param only_once: flag for testing
"""

def feed():
Expand All @@ -95,34 +79,35 @@ def feed_from_db():
for p in proxies:
scheduler.validator_queue.put(p)

logger.info('Feed {} proxies from the database for a second time validation'.format(len(proxies)))
logger.debug('Feed {} proxies from the database for a second time validation'.format(len(proxies)))

# feed providers at the very beginning
scheduler.feed_providers()

schedule.every(10).minutes.do(feed)
schedule.every(FEED_FROM_DB_INTERVAL_MINUTES).minutes.do(feed_from_db)

logger.debug('cron_thread started.')
logger.info('Start python scheduler')

flag = True

# After 1 minute, try feed_from_db() for the first time
wait_time_for_feed_from_db = 1 if run_once else 60
wait_time_for_feed_from_db = 1 if only_once else 60
time.sleep(wait_time_for_feed_from_db)
feed_from_db()

while True:
while flag:
try:
schedule.run_pending()

if run_once:
raise SystemExit
if only_once:
flag = False
else:
time.sleep(60)
except (KeyboardInterrupt, InterruptedError, SystemExit):
except (KeyboardInterrupt, InterruptedError):
logger.info('Stopping python scheduler')
break

logger.debug('cron_thread exited.')


class Scheduler(object):

Expand All @@ -134,37 +119,40 @@ def __init__(self):
self.cron_thread = None
self.validator_pool = ThreadPoolExecutor(max_workers=int(get_config('validation_pool', default='31')))

def start(self, run_once=False):
def start(self):
"""
Start the scheduler with processes for worker (fetching candidate proxies from different providers),
and validator threads for checking whether the fetched proxies are able to use.
:param daemon: if False, scheduler will run each task only once then exit when they all finish
"""
self.cron_thread = Thread(target=cron_schedule, args=(self, run_once), daemon=True)
self.worker_process = Process(target=fetch_ips, args=(self.worker_queue, self.validator_queue, run_once))
self.validator_thread = Thread(target=validate_ips, args=(self.validator_queue, self.validator_pool, run_once))
logger.info('Scheduler starts...')

self.cron_thread = Thread(target=cron_schedule, args=(self,), daemon=True)
self.worker_process = Process(target=fetch_ips, args=(self.worker_queue, self.validator_queue))
self.validator_thread = Thread(target=validate_ips, args=(self.validator_queue, self.validator_pool))

self.cron_thread.daemon = True
self.worker_process.daemon = True
self.validator_thread.daemon = True

self.cron_thread.start()
self.worker_process.start()
self.worker_process.start() # Python will wait for all process finished
logger.info('worker_process started')
self.validator_thread.start()
logger.info('validator_thread started')

def join(self):
"""
Wait for worker processes and validator threads
"""
try:
if self.cron_thread and self.cron_thread.is_alive():
self.cron_thread.join()
if self.worker_process and self.worker_process.is_alive():
while (self.worker_process and self.worker_process.is_alive()) or (
self.validator_thread and self.validator_thread.is_alive()):
try:
self.worker_process.join()
if self.validator_thread and self.validator_thread.is_alive():
self.validator_thread.join()
except (KeyboardInterrupt, SystemExit):
pass
except (KeyboardInterrupt, SystemExit):
break

def feed_providers(self):
logger.debug('feed {} providers...'.format(len(all_providers)))
Expand All @@ -177,8 +165,3 @@ def stop(self):
self.worker_process.terminate()
# self.validator_thread.terminate() # TODO: 'terminate' the thread using a flag
self.validator_pool.shutdown(wait=False)

def is_alive(self):
return (self.cron_thread and self.cron_thread.is_alive()) or \
(self.worker_process and self.worker_process.is_alive()) or \
(self.validator_thread and self.validator_thread.is_alive())
2 changes: 0 additions & 2 deletions scylla/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import math

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

from .loggings import logger
from .tcpping import ping
Expand Down
1 change: 0 additions & 1 deletion scylla/web/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from scylla.web.server import start_web_server
from scylla.web.server import start_web_server_non_blocking
13 changes: 2 additions & 11 deletions scylla/web/server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import math
import os
from multiprocessing import Process

from playhouse.shortcuts import model_to_dict
from sanic import Sanic
from sanic.request import Request
from sanic.response import json
from sanic_cors import CORS

from scylla.config import get_config
from scylla.database import ProxyIP
from scylla.loggings import logger

Expand Down Expand Up @@ -139,12 +137,5 @@ async def api_v1_stats(request: Request):
})


def start_web_server(workers=1):
host = str(get_config('web_host', default='0.0.0.0'))
port = int(get_config('web_port', default='8899'))
app.run(host=host, port=port, workers=workers)

def start_web_server_non_blocking(workers=1):
p = Process(target=start_web_server, daemon=True, kwargs={'workers': workers})
p.start()
return p
def start_web_server(host='0.0.0.0', port=8899):
app.run(host=host, port=port)

0 comments on commit bd16f69

Please sign in to comment.