Skip to content

Commit

Permalink
Merge branch 'yankovs-feat/delete_more_query_samples' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
danielplohmann committed May 10, 2024
2 parents 5e34a9a + 3acc541 commit 07b7b18
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 36 deletions.
72 changes: 40 additions & 32 deletions mcrit/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,59 +192,67 @@ def doDbCleanup(self, progress_reporter=NoProgressReporter()):
unmapped_failed = self.getQueueData(0, 0, method="getMatchesForUnmappedBinary", state="failed")
mapped_failed = self.getQueueData(0, 0, method="getMatchesForMappedBinary", state="failed")
smda_finished = self.getQueueData(0, 0, method="getMatchesForSmdaReport", state="finished")
recent_samples = set([])
protected_sample_ids = set([])
samples_to_be_deleted = {}
jobs_to_be_deleted = []
LOGGER.info(
f"Fetching data from the queues, iterating {len(unmapped_finished) + len(unmapped_finished) + len(unmapped_failed) + len(mapped_failed) + len(smda_finished)} items."
f"Collected all data from the queues, now iterating {len(unmapped_finished) + len(mapped_finished) + len(unmapped_failed) + len(mapped_failed) + len(smda_finished)} items."
)
# first iterate and collect all potentially stale sample_entries by their submission/processing timestamp
for sample_entry in self._storage.getSamples(start_index=0, limit=0, is_query=True):
if sample_entry.timestamp is None:
LOGGER.warning(f"Found query_samples entry without timestamp: {sample_entry}")
continue
if sample_entry.sha256 not in samples_to_be_deleted:
samples_to_be_deleted[sample_entry.sha256] = []
if sample_entry.timestamp < time_cutoff:
samples_to_be_deleted[sample_entry.sha256].append(sample_entry)

for job_collection in [unmapped_finished, mapped_finished]:
for job_dict in job_collection:
job = Job(job_dict, None)
result = self.getResultForJob(job.job_id)
reference_sample_entry = SampleEntry.fromDict(result["info"]["sample"])
# we keep those query samples that have been submitted since the cutoff
if job.finished_at > time_cutoff:
recent_samples.add(job.sha256)
protected_sample_ids.add(reference_sample_entry.sample_id)
else:
jobs_to_be_deleted.append(job)
if job.sha256 not in samples_to_be_deleted:
sample_entry = self._storage.getSampleBySha256(job.sha256, is_query=True)
if sample_entry:
samples_to_be_deleted[job.sha256] = sample_entry
if reference_sample_entry.sha256 not in samples_to_be_deleted:
samples_to_be_deleted[reference_sample_entry.sha256] = []
samples_to_be_deleted[reference_sample_entry.sha256].append(reference_sample_entry)
for failed_job_collection in [unmapped_failed, mapped_failed]:
for failed_job_dict in failed_job_collection:
job = Job(failed_job_dict, None)
# failed job doesn't have finished_at, delete anyway
jobs_to_be_deleted.append(job)
if job.sha256 not in samples_to_be_deleted:
sample_entry = self._storage.getSampleBySha256(job.sha256, is_query=True)
if sample_entry:
samples_to_be_deleted[job.sha256] = sample_entry
result = self.getResultForJob(job.job_id)
reference_sample_entry = SampleEntry.fromDict(result["info"]["sample"])
if job.started_at > time_cutoff:
protected_sample_ids.add(reference_sample_entry.sample_id)
else:
jobs_to_be_deleted.append(job)
if reference_sample_entry.sha256 not in samples_to_be_deleted:
samples_to_be_deleted[reference_sample_entry.sha256] = []
samples_to_be_deleted[reference_sample_entry.sha256].append(reference_sample_entry)
LOGGER.info(f"Decoding SMDA reports for SHA256 hashes.")
for job_dict in smda_finished:
job = Job(job_dict, None)
# find sample info based on smda report, extract sha256 via regex
smda_dict_start = self.queue.getFileByHash(job.sha256, max_bytes=2048)
match = re.search(b'"sha256": "(?P<sha256>[A-Fa-f0-9]{64})"', smda_dict_start)
if match:
smda_file_sha256 = match.group('sha256').decode("ascii")
# we keep those query samples that have been submitted since the cutoff
if job.finished_at > time_cutoff:
recent_samples.add(smda_file_sha256)
else:
jobs_to_be_deleted.append(job)
if smda_file_sha256 not in samples_to_be_deleted:
sample_entry = self._storage.getSampleBySha256(smda_file_sha256, is_query=True)
if sample_entry:
samples_to_be_deleted[smda_file_sha256] = sample_entry
result = self.getResultForJob(job.job_id)
reference_sample_entry = SampleEntry.fromDict(result["info"]["sample"])
# we keep those query samples that have been submitted since the cutoff
if job.finished_at > time_cutoff:
protected_sample_ids.add(reference_sample_entry.sample_id)
else:
LOGGER.warn(f"Target SHA256 not found in first 2048 bytes of SMDA report with file hash {job.sha256}.")
for sha256 in recent_samples:
samples_to_be_deleted.pop(sha256, None)
jobs_to_be_deleted.append(job)
if reference_sample_entry.sha256 not in samples_to_be_deleted:
samples_to_be_deleted[reference_sample_entry.sha256] = []
samples_to_be_deleted[reference_sample_entry.sha256].append(reference_sample_entry)
LOGGER.info(f"Found {len(samples_to_be_deleted)} query samples that can be deleted")
progress_reporter.set_total(len(samples_to_be_deleted))
for sample_sha256, sample_entry in samples_to_be_deleted.items():
for sample_sha256, sample_entries in samples_to_be_deleted.items():
LOGGER.info(f"Deleting {sample_entry.sample_id}.")
self._storage.deleteSample(sample_entry.sample_id)
for sample_id in set([sample_entry.sample_id for sample_entry in sample_entries]):
if sample_id not in protected_sample_ids:
self._storage.deleteSample(sample_id)
progress_reporter.step()
# now remove the respective data also from the queue, which also deletes the results from GridFS
LOGGER.info(f"Found {len(jobs_to_be_deleted)} query jobs that can be deleted.")
Expand Down
6 changes: 6 additions & 0 deletions mcrit/libs/mongoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ def delete_job(self, job_id, with_result=True):
deletable_job = self._getCollection().find_one({"_id": job_id})
if deletable_job:
self.updateQueueCounter(deletable_job["payload"]["method"], self._identifyJobState(deletable_job), -1)
# if job has file parameters, we need to remove them from GridFS as well
print(deletable_job)
if "file_params" in deletable_job["payload"]:
file_params_dict = json.loads(deletable_job["payload"]["file_params"])
for k, file_object_id in file_params_dict.items():
self._getFs().delete(ObjectId(file_object_id))
if with_result:
# delete result from GridFS
self._getFs().delete(ObjectId(deletable_job["result"]))
Expand Down
10 changes: 7 additions & 3 deletions mcrit/storage/MongoDbStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,14 @@ def getSamplesByFamilyId(self, family_id: int) -> Optional[List["SampleEntry"]]:
samples = self._getDb().samples.find({"family_id": family_id}, {"_id":0})
return [SampleEntry.fromDict(sample_document) for sample_document in samples]

def getSamples(self, start_index: int, limit: int) -> Optional["SampleEntry"]:
def getSamples(self, start_index: int, limit: int, is_query=False) -> Optional["SampleEntry"]:
sample_entries = []
for sample_document in self._getDb().samples.find().skip(start_index).limit(limit):
sample_entries.append(SampleEntry.fromDict(sample_document))
if is_query:
for sample_document in self._getDb().query_samples.find().skip(start_index).limit(limit):
sample_entries.append(SampleEntry.fromDict(sample_document))
else:
for sample_document in self._getDb().samples.find().skip(start_index).limit(limit):
sample_entries.append(SampleEntry.fromDict(sample_document))
return sample_entries

def clearStorage(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion mcrit/storage/StorageInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,14 @@ def getSamplesByFamilyId(self, family_id: int) -> Optional[List["SampleEntry"]]:
"""
raise NotImplementedError

def getSamples(self, start_index: int, limit: int) -> Optional["SampleEntry"]:
def getSamples(self, start_index: int, limit: int, is_query=False) -> Optional["SampleEntry"]:
"""Iterate the sample collection and provide a slice (regardless of sample_id),
covering up to <limit> items, starting from start_index
Args:
start_index: <n>th sample in the collection
limit: number of entries to return at most
is_query: search through query_sampes instead
Returns:
the respective SampleEntries
Expand Down

0 comments on commit 07b7b18

Please sign in to comment.