Wrong logic in Platform.wait_for_work, and simplify it with property/decorator attribute
Hello @dbeltran,
Reading this function for !475 , I realized that the docs and code could probably be improved. For 4.2.0 or later, I think, as it has low priority.
It's late in the day, so my logic may be a bit flawed, but I think the current logic has a few issues;
The process_log starts set at False, while this could actually be wrong... I mean, maybe there are already logs. But the process will sleep for at least one second anyway, even if unnecessary.
Then, if self.work_event is set, or self.recovery_queue is not empty, without the cleanup_event the loop will proceed to sleep one second until the range is exhausted. Even if process_log was already set to True. It would be better to leave earlier.
In a few lines below, there is a while loop on timeout being greather than 0, and on the process_log being True (it appears to be the same conditions that some lines above set process_log to True). So, if I am reading the boolean logic correct, it will sleep one second until the timeout is exhausted or while there are logs to process.
I think the intention was to sleep one second as long as the timeout was greater than zero, and there were no logs to process (i.e. inverted value?).
The wait_for_work could have less if's, and use @propertys to simplify reading the logic, IMHO. Assuming the condition to process logs is isolated, I think one possible direction to improve it could be something like (not tested, would need more
diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 9af2ca48..28bd98dd 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -854,27 +854,40 @@ class Platform(object):
self.cleanup_event.set()
self.log_recovery_process.join()
- def wait_for_work(self, sleep_time=60):
+ @property
+ def process_log(self):
+ """Whether to process logs or not.
+
+ Returns True if the ``work_event`` was set (set in ``job.update_parameters``).
+ Returns True if the there are items in ``recovery_queue`` (populated by ``self.add_job_to_log_recover``).
+ Returns True if the ``cleanup_event`` was set (set at the atexit handler function).
+
+ :returns: True if there are logs waiting to be processed. False otherwise.
"""
- This function waits for the work_event to be set or the cleanup_event to be set.
+ return self.work_event.is_set() or not self.recovery_queue.empty() or self.cleanup_event.is_set()
+
+ def wait_for_work(self, sleep_time: int = 1) -> bool:
"""
- process_log = False
- for remaining in range(sleep_time, 0, -1): # Min time to wait unless clean-up signal is set
- time.sleep(1)
- if self.work_event.is_set() or not self.recovery_queue.empty():
- process_log = True
- if self.cleanup_event.is_set():
- process_log = True
- break
- if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process.
- timeout = self.keep_alive_timeout - sleep_time
- while timeout > 0 or not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set():
- time.sleep(1)
- timeout -= 1
- if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set():
- process_log = True
+ Waits until there are logs to be processed, or until the keep alive timeout is reached.
+
+ It will check he ``self.process_log`` state, and until it is True it will sleep for one second,
+ then check again.
+
+ At the end it will clear the ``self.work_event`` event, and return the True if there are logs
+ to be processed, and False otherwise.
+
+ :param sleep_time: sleep time in seconds.
+ :type sleep_time: int
+ :return: True if there are logs to be processed, False otherwise.
+ :rtype: bool
+ """
+ timeout = self.keep_alive_timeout
+ while timeout > 0 and not self.process_log:
+ time.sleep(sleep_time)
+ timeout -= 1
+
self.work_event.clear()
- return process_log
+ return self.process_log
def recover_job_log(self, identifier, jobs_pending_to_process):
job = None
@@ -913,10 +926,13 @@ class Platform(object):
pass
return jobs_pending_to_process
- def recover_platform_job_logs(self):
+ def recover_platform_job_logs(self) -> None:
"""
This function, recovers the logs of the jobs that have been submitted.
- The exit of this process is controlled by the work_event and cleanup_events of the main process.
+
+ See ``self.process_log`` for the property that controls whether there are logs to be processed,
+ or not. And ``self.wait_for_work`` for the function that loops checking that property limited by
+ a timeout.
"""
setproctitle.setproctitle(f"autosubmit log {self.expid} recovery {self.name.lower()}")
identifier = f"{self.name.lower()}(log_recovery):"
@@ -928,7 +944,7 @@ class Platform(object):
Log.result(f"{identifier} Sucessfully connected.")
log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60)
self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5)
- while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)):
+ while self.wait_for_work():
jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process)
if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end.
self.recover_job_log(identifier, jobs_pending_to_process)
If the logic above is correct, I think it would be a bit more readable, easier to maintain, and perform slightly better (but again, I need more
Cheers

