diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index cb41d4ab1887..855b7f8bb73a 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -882,11 +882,44 @@ def terminate_processes(signum, frame): @signals_group.command("process") def process_signals(): """Runs a continuous process that does additional processing on newly created signals.""" + from sqlalchemy import asc + from dispatch.common.utils.cli import install_plugins - from dispatch.signal.service import main_processing_loop + from dispatch.database.core import SessionLocal, engine, sessionmaker + from dispatch.organization.service import get_all as get_all_organizations + from dispatch.signal import flows as signal_flows + from dispatch.signal.models import SignalInstance install_plugins() - main_processing_loop() + + organizations = get_all_organizations(db_session=SessionLocal()) + while True: + for organization in organizations: + schema_engine = engine.execution_options( + schema_translate_map={ + None: f"dispatch_organization_{organization.slug}", + } + ) + db_session = sessionmaker(bind=schema_engine)() + signal_instances = ( + ( + db_session.query(SignalInstance) + .filter(SignalInstance.filter_action == None) # noqa + .filter(SignalInstance.case_id == None) # noqa + ) + .order_by(asc(SignalInstance.created_at)) + .limit(500) + ) + for signal_instance in signal_instances: + try: + signal_flows.signal_instance_create_flow( + db_session=db_session, + signal_instance_id=signal_instance.id, + ) + except Exception as e: + log.debug(signal_instance) + log.exception(e) + db_session.close() @dispatch_server.command("slack")