以下の内容はhttps://smooth-pudding.hatenablog.com/entry/2021/10/21/215320より取得しました。


デコレータで遊ぶ(python) :: 特定の時間以外は挙動を変える

こんにちは。
ふとデコレータで遊びたくなったので書いてみました。

import datetime as dt
from typing import Callable


def time_limit(begin: dt.time, end: dt.time, default: Callable):
    def deco(func: Callable):
        def wrapper(*args, **kwargs):
            now = dt.datetime.now().time()
            if (begin <= now <= end) \
                    or (now <= end < begin) \
                    or (end < begin <= now):
                return func(*args, **kwargs)
            else:
                return default(*args, **kwargs)

        return wrapper

    return deco


def off_mode(*args, **kwargs) -> str:
    print("No, I'm off mode now.")
    return "Do nothing"


@time_limit(dt.time(9), dt.time(17), off_mode)
def pass_task(task: str) -> str:
    print(f"Sure, I'll do that task '{task}'.")
    return "I'm doing it"


if __name__ == '__main__':
    response = pass_task("wash dishes")
    # Case1: No, I'm off mode now.
    # Case2: Sure, I'll do that task 'wash dishes.'
    print(response)
    # Case1: Do nothing
    # Case2: I'm doing it

おまけで、一定の時間内に終わらなければ関数の実行を強制終了するデコレータも作ってみました

import time
import threading
from queue import Queue


def stop_in_10_sec(func):
    q_master = Queue()
    q_return = Queue()

    def stopper():
        time.sleep(10)
        q_master.put("")

    def func_with_queue(*args, **kwargs):
        nonlocal q_return, q_master
        q_return.put(func(*args, **kwargs))
        q_master.put("")

    def wrapper(*args, **kwargs):
        th_master = threading.Thread(target=q_master.get)
        th_func = threading.Thread(target=func_with_queue, args=args, kwargs=kwargs)
        th_func.setDaemon(True)
        th_stopper = threading.Thread(target=stopper)
        th_stopper.setDaemon(True)

        th_master.start()
        th_func.start()
        th_stopper.start()
        th_master.join()
        if q_return.empty():
            raise TimeoutError
        else:
            return q_return.get_nowait()

    return wrapper


@stop_in_10_sec
def count_up():
    for i in range(20):
        print(i)
        time.sleep(1)
    return "Finished!"


if __name__ == '__main__':
    print(count_up())  # 9 まで数えて停止

よっしゃできた〜と思ったあたりで、実は timeout_decorator というパッケージがあることを知りました。車輪の再発明だった〜〜〜

import timeout_decorator


@timeout_decorator.timeout(10)
def count_up():
    for i in range(20):
        print(i)
        time.sleep(1)
    return "Finished!"


if __name__ == '__main__':
    print(count_up())  # 9 まで数えて停止

追記(2021/10/21):: さらにおまけで、指定した時間外になると、実行中でも中止されるようなデコレータを作りました。もしかしてこれも車輪の再発明・・・?(震声)

import datetime as dt
from functools import wraps
from threading import Thread
import time
from typing import Callable, Any
from queue import Queue


class TimeInterval:
    def __init__(self, begin: dt.time = dt.time.min, end: dt.time = dt.time.max):
        self.__begin = begin
        self.__end = end

    @property
    def begin(self) -> dt.time:
        return self.__begin

    @property
    def end(self) -> dt.time:
        return self.__end

    def __str__(self) -> str:
        return f"{self.begin:%H:%M:%S.%f} - {self.end:%H:%M:%S.%f}"

    def __contains__(self, item) -> bool:
        if isinstance(item, dt.time):
            return (self.begin <= item <= self.end) \
                   or (item <= self.end < self.begin) \
                   or (self.end < self.begin <= item)
        raise NotImplemented


class OvertimeError(Exception):
    pass


class WorkTimeManager:
    def __init__(self, begin: dt.time = dt.time.min, end: dt.time = dt.time.max):
        self.__work_time = TimeInterval(begin, end)
        self.__now = dt.datetime.now().time()
        self.__q_master = Queue()
        self.__q_return = Queue()

    # ----- for time control -----
    def update_now(self) -> None:
        self.__now = dt.datetime.now().time()

    def is_overtime(self) -> bool:
        return self.__now not in self.__work_time

    def raise_overtime_error(self) -> None:
        err_msg = f"It's {self.__now}, " \
                  f"out of work time ({self.__work_time})"
        raise OvertimeError(err_msg)

    # ----- for queue control -----
    def notify_master(self) -> None:
        self.__q_master.put("")

    def await_notification(self) -> None:
        self.__q_master.get()

    def put_return_value(self, value) -> None:
        self.__q_return.put(value)

    @property
    def yet_returned(self) -> bool:
        return self.__q_return.empty()

    def get_return_value(self) -> Any:
        return self.__q_return.get_nowait()


def work_time(
        begin: dt.time = dt.time.min,
        end: dt.time = dt.time.max,
        check_frequency: float = 1.,
):
    def deco(func: Callable) -> Callable:
        man = WorkTimeManager(begin, end)

        def func_for_q(*args, **kwargs) -> None:
            man.put_return_value(func(*args, **kwargs))
            man.notify_master()

        def keep_time(freq: float) -> None:
            while True:
                man.update_now()
                if man.is_overtime():
                    man.notify_master()
                    break
                time.sleep(freq)

        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            if man.is_overtime():
                man.raise_overtime_error()

            th_master = Thread(target=man.await_notification)
            th_func = Thread(target=func_for_q, args=args, kwargs=kwargs)
            th_func.setDaemon(True)
            th_time = Thread(target=keep_time, args=(check_frequency,))
            th_time.setDaemon(True)

            th_master.start()
            th_func.start()
            th_time.start()
            th_master.join()  # run next line as soon as notified
            if man.yet_returned:
                raise man.raise_overtime_error()
            else:
                return man.get_return_value()

        return wrapper

    return deco


# **********************************
#  Example
# **********************************
@work_time(dt.time(7), dt.time(23))  # seven-eleven
def count_up() -> None:
    i = 0
    while True:
        print(i)
        i += 1
        time.sleep(1)


if __name__ == '__main__':
    print(count_up())



以上の内容はhttps://smooth-pudding.hatenablog.com/entry/2021/10/21/215320より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14