こんにちは。
ふとデコレータで遊びたくなったので書いてみました。
import datetime as dt from typing import Callable def time_limit(begin: dt.time, end: dt.time, default: Callable): def deco(func: Callable): def wrapper(*args, **kwargs): now = dt.datetime.now().time() if (begin <= now <= end) \ or (now <= end < begin) \ or (end < begin <= now): return func(*args, **kwargs) else: return default(*args, **kwargs) return wrapper return deco def off_mode(*args, **kwargs) -> str: print("No, I'm off mode now.") return "Do nothing" @time_limit(dt.time(9), dt.time(17), off_mode) def pass_task(task: str) -> str: print(f"Sure, I'll do that task '{task}'.") return "I'm doing it" if __name__ == '__main__': response = pass_task("wash dishes") # Case1: No, I'm off mode now. # Case2: Sure, I'll do that task 'wash dishes.' print(response) # Case1: Do nothing # Case2: I'm doing it
おまけで、一定の時間内に終わらなければ関数の実行を強制終了するデコレータも作ってみました
import time import threading from queue import Queue def stop_in_10_sec(func): q_master = Queue() q_return = Queue() def stopper(): time.sleep(10) q_master.put("") def func_with_queue(*args, **kwargs): nonlocal q_return, q_master q_return.put(func(*args, **kwargs)) q_master.put("") def wrapper(*args, **kwargs): th_master = threading.Thread(target=q_master.get) th_func = threading.Thread(target=func_with_queue, args=args, kwargs=kwargs) th_func.setDaemon(True) th_stopper = threading.Thread(target=stopper) th_stopper.setDaemon(True) th_master.start() th_func.start() th_stopper.start() th_master.join() if q_return.empty(): raise TimeoutError else: return q_return.get_nowait() return wrapper @stop_in_10_sec def count_up(): for i in range(20): print(i) time.sleep(1) return "Finished!" if __name__ == '__main__': print(count_up()) # 9 まで数えて停止
よっしゃできた〜と思ったあたりで、実は timeout_decorator というパッケージがあることを知りました。車輪の再発明だった〜〜〜
import timeout_decorator @timeout_decorator.timeout(10) def count_up(): for i in range(20): print(i) time.sleep(1) return "Finished!" if __name__ == '__main__': print(count_up()) # 9 まで数えて停止
追記(2021/10/21):: さらにおまけで、指定した時間外になると、実行中でも中止されるようなデコレータを作りました。もしかしてこれも車輪の再発明・・・?(震声)
import datetime as dt from functools import wraps from threading import Thread import time from typing import Callable, Any from queue import Queue class TimeInterval: def __init__(self, begin: dt.time = dt.time.min, end: dt.time = dt.time.max): self.__begin = begin self.__end = end @property def begin(self) -> dt.time: return self.__begin @property def end(self) -> dt.time: return self.__end def __str__(self) -> str: return f"{self.begin:%H:%M:%S.%f} - {self.end:%H:%M:%S.%f}" def __contains__(self, item) -> bool: if isinstance(item, dt.time): return (self.begin <= item <= self.end) \ or (item <= self.end < self.begin) \ or (self.end < self.begin <= item) raise NotImplemented class OvertimeError(Exception): pass class WorkTimeManager: def __init__(self, begin: dt.time = dt.time.min, end: dt.time = dt.time.max): self.__work_time = TimeInterval(begin, end) self.__now = dt.datetime.now().time() self.__q_master = Queue() self.__q_return = Queue() # ----- for time control ----- def update_now(self) -> None: self.__now = dt.datetime.now().time() def is_overtime(self) -> bool: return self.__now not in self.__work_time def raise_overtime_error(self) -> None: err_msg = f"It's {self.__now}, " \ f"out of work time ({self.__work_time})" raise OvertimeError(err_msg) # ----- for queue control ----- def notify_master(self) -> None: self.__q_master.put("") def await_notification(self) -> None: self.__q_master.get() def put_return_value(self, value) -> None: self.__q_return.put(value) @property def yet_returned(self) -> bool: return self.__q_return.empty() def get_return_value(self) -> Any: return self.__q_return.get_nowait() def work_time( begin: dt.time = dt.time.min, end: dt.time = dt.time.max, check_frequency: float = 1., ): def deco(func: Callable) -> Callable: man = WorkTimeManager(begin, end) def func_for_q(*args, **kwargs) -> None: man.put_return_value(func(*args, **kwargs)) man.notify_master() def keep_time(freq: float) -> None: while True: man.update_now() if man.is_overtime(): man.notify_master() break time.sleep(freq) @wraps(func) def wrapper(*args, **kwargs) -> Any: if man.is_overtime(): man.raise_overtime_error() th_master = Thread(target=man.await_notification) th_func = Thread(target=func_for_q, args=args, kwargs=kwargs) th_func.setDaemon(True) th_time = Thread(target=keep_time, args=(check_frequency,)) th_time.setDaemon(True) th_master.start() th_func.start() th_time.start() th_master.join() # run next line as soon as notified if man.yet_returned: raise man.raise_overtime_error() else: return man.get_return_value() return wrapper return deco # ********************************** # Example # ********************************** @work_time(dt.time(7), dt.time(23)) # seven-eleven def count_up() -> None: i = 0 while True: print(i) i += 1 time.sleep(1) if __name__ == '__main__': print(count_up())