Skip to content

Wrong logic in Platform.wait_for_work, and simplify it with property/decorator attribute

Hello @dbeltran,

Reading this function for !475 , I realized that the docs and code could probably be improved. For 4.2.0 or later, I think, as it has low priority.

It's late in the day, so my logic may be a bit flawed, but I think the current logic has a few issues;

https://earth.bsc.es/gitlab/es/autosubmit/-/blob/02c71e4d522fe5ede77fe83c5caebdf05f1827dd/autosubmit/platforms/platform.py#L861-868

image

The process_log starts set at False, while this could actually be wrong... I mean, maybe there are already logs. But the process will sleep for at least one second anyway, even if unnecessary.

Then, if self.work_event is set, or self.recovery_queue is not empty, without the cleanup_event the loop will proceed to sleep one second until the range is exhausted. Even if process_log was already set to True. It would be better to leave earlier.

In a few lines below, there is a while loop on timeout being greather than 0, and on the process_log being True (it appears to be the same conditions that some lines above set process_log to True). So, if I am reading the boolean logic correct, it will sleep one second until the timeout is exhausted or while there are logs to process.

I think the intention was to sleep one second as long as the timeout was greater than zero, and there were no logs to process (i.e. inverted value?).

https://earth.bsc.es/gitlab/es/autosubmit/-/blob/02c71e4d522fe5ede77fe83c5caebdf05f1827dd/autosubmit/platforms/platform.py#L871

image

The wait_for_work could have less if's, and use @propertys to simplify reading the logic, IMHO. Assuming the condition to process logs is isolated, I think one possible direction to improve it could be something like (not tested, would need more and write some tests to verify this change):

diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 9af2ca48..28bd98dd 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -854,27 +854,40 @@ class Platform(object):
             self.cleanup_event.set()
             self.log_recovery_process.join()
 
-    def wait_for_work(self, sleep_time=60):
+    @property
+    def process_log(self):
+        """Whether to process logs or not.
+
+        Returns True if the ``work_event`` was set (set in ``job.update_parameters``).
+        Returns True if the there are items in ``recovery_queue`` (populated by ``self.add_job_to_log_recover``).
+        Returns True if the ``cleanup_event`` was set (set at the atexit handler function).
+
+        :returns: True if there are logs waiting to be processed. False otherwise.
         """
-        This function waits for the work_event to be set or the cleanup_event to be set.
+        return self.work_event.is_set() or not self.recovery_queue.empty() or self.cleanup_event.is_set()
+
+    def wait_for_work(self, sleep_time: int = 1) -> bool:
         """
-        process_log = False
-        for remaining in range(sleep_time, 0, -1):  # Min time to wait unless clean-up signal is set
-            time.sleep(1)
-            if self.work_event.is_set() or not self.recovery_queue.empty():
-                process_log = True
-            if self.cleanup_event.is_set():
-                process_log = True
-                break
-        if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process.
-            timeout = self.keep_alive_timeout - sleep_time
-            while timeout > 0 or not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set():
-                time.sleep(1)
-                timeout -= 1
-            if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set():
-                process_log = True
+        Waits until there are logs to be processed, or until the keep alive timeout is reached.
+
+        It will check he ``self.process_log`` state, and until it is True it will sleep for one second,
+        then check again.
+
+        At the end it will clear the ``self.work_event`` event, and return the True if there are logs
+        to be processed, and False otherwise.
+
+        :param sleep_time: sleep time in seconds.
+        :type sleep_time: int
+        :return: True if there are logs to be processed, False otherwise.
+        :rtype: bool
+        """
+        timeout = self.keep_alive_timeout
+        while timeout > 0 and not self.process_log:
+            time.sleep(sleep_time)
+            timeout -= 1
+
         self.work_event.clear()
-        return process_log
+        return self.process_log
 
     def recover_job_log(self, identifier, jobs_pending_to_process):
         job = None
@@ -913,10 +926,13 @@ class Platform(object):
                 pass
         return jobs_pending_to_process
 
-    def recover_platform_job_logs(self):
+    def recover_platform_job_logs(self) -> None:
         """
         This function, recovers the logs of the jobs that have been submitted.
-        The exit of this process is controlled by the work_event and cleanup_events of the main process.
+
+        See ``self.process_log`` for the property that controls whether there are logs to be processed,
+        or not. And ``self.wait_for_work`` for the function that loops checking that property limited by
+        a timeout.
         """
         setproctitle.setproctitle(f"autosubmit log {self.expid} recovery {self.name.lower()}")
         identifier = f"{self.name.lower()}(log_recovery):"
@@ -928,7 +944,7 @@ class Platform(object):
         Log.result(f"{identifier} Sucessfully connected.")
         log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60)
         self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5)
-        while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)):
+        while self.wait_for_work():
             jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process)
             if self.cleanup_event.is_set():  # Check if main process is waiting for this child to end.
                 self.recover_job_log(identifier, jobs_pending_to_process)

If the logic above is correct, I think it would be a bit more readable, easier to maintain, and perform slightly better (but again, I need more before saying anything for sure).

Cheers