Bootstrap Chameleon Logo

Chameleon工具启动优化:异步执行耗时任务

问题

在我之前的chameleon_stable_diffusion工具中,会在界面打开的第一时间尝试连接服务器,用于拉取可用的AI模型名称,并把它们显示在界面上。当网络和服务器正常的时候,界面打开还算流畅(0.1~0.2秒),但当服务器异常的时候,体验就变得很糟糕:整个编辑器会卡顿数秒钟,直到python抛出网络连接超时的异常消息为止。

Alt text

如果你做的Chameleon工具在打开时需要读取大量的数据,或者需要进行网络请求,那么也很有可能会遇到类似的问题。这篇Blog和下面的例子,将会帮助你解决这个问题。

G43_MinimalAsyncTaskExample

简要步骤

  1. 工具实例的__init__函数中,创建一个ChameleonTaskExecutor实例,用于提交异步任务
  2. 在Chameleon工具的json的InitPyCmd创建完工具实例后,执行some_slow_tasks,模拟耗时任务
  3. some_slow_tasks 中将两个异步任务提交给executor,并指定任务完成时的回调函数
  4. 回调函数中通过future_id获取任务的返回值,并更新界面

这个例子可以在TAPython_DefaultResources中找到,并会包含在后续版本的TAPython

Sequence Diagram:

minimal_async_task_example_diagram_dark

具体介绍

前置条件

虚幻引擎强制限制了在除了主线程之外的线程中修改Slate小部件的操作,因此我们在非主线程中直接修改Slate控件是不被允许的。

NOTE
SlateGlobals.h: #define SLATE_CROSS_THREAD_CHECK() checkf(IsInGameThread() || IsInSlateThread(), TEXT("Access to Slate is restricted to the GameThread or the SlateLoadingThread!"));

由于有这个限制,我们的现在的目标就变成了两个:

  1. 在非主线程中执行耗时任务
  2. 在完成耗时任务后,通过回调函数在主线程中更新Slate控件

在这个例子中MinimalAsyncTaskExample,当我们打开界面的时候,会执行两个"Slow Task",模拟我们读取文件或网络请求等情况。当任务完成时,我们的界面会更新,并将结果显示出来。

来看看具体时怎么做的:

1. 启动时调用“Slow task”

{
    "TabLabel": "Chameleon Async Example",
    "InitTabSize": [350, 186],
    "InitTabPosition": [800, 100],
    "InitPyCmd": "import Example; chameleon_mini_async_example = Example.MinimalAsyncTaskExample.MinimalAsyncTaskExample(%JsonPath); chameleon_mini_async_example.some_slow_tasks()",
    "Root":{
        ...
    }
}

在这里可以看到,在常规启动工具的代码chameleon_mini_async_example = Example.MinimalAsyncTaskExample.MinimalAsyncTaskExample(%JsonPath)之后,我们执行了chameleon_mini_async_example.some_slow_tasks()

注意,我们是在json文件中通过工具的实例变量chameleon_mini_async_example调用的some_slow_tasks(),而不是在工具的构造函数__init__中执行。这能保证我们后续代码some_slow_tasks执行时,工具和界面已经完成了创建,并且能够通过全局变量访问到该实例。

NOTE
由于我们使用单例模式,所以我们在多次打开工具的时候,Chameleon工具只会执行一次__inti__方法。如果我们在InitCmd中通过importlib.reload去重新加载模块,那么Chameleon工具会重新执行__init__方法。

当然,我们也可以在工具执行的其他阶段提交异步任务。例如点击按钮后,发送网络请求(比如Stable-diffusion)等慢任务。这个例子中的按钮就模拟了这种情况。

2. 提交异步任务

some_slow_tasks中, 我们添加了两次异步任务self.slow_async_task, 并在两次调用时传入了不同的参数。在这两个任务结束之后,回调函数self.on_task_finish分别会被调用。

    def some_slow_tasks(self):
        self.show_busy_icon(True)
        self.executor.submit_task(self.slow_async_task, args=[2], on_finish_callback=self.on_task_finish)
        self.executor.submit_task(self.slow_async_task, args=[3], on_finish_callback=self.on_task_finish)

TIP
不同异步任务的执行顺序是不确定的

在这里,我们通过一个executor来提交异步任务。它负责管理异步任务的执行,以及在任务完成时执行回调函数。

在具体实现中,我们使用了concurrent.futures库中的ThreadPoolExecutor来实现异步任务的提交和执行。

    def submit_task(self, task:Callable, args=None, kwargs=None, on_finish_callback: Union[Callable, str] = None)-> int:

submit_task中,我们可以传入一个任务函数task,以及任务函数的参数argskwargs。我们还可以传入一个回调函数on_finish_callback,在任务完成时会被调用。

on_finish_callback可以是一个函数或者一个字符串。当这个函数有参数时,我们会将任务future_id值作为第一个参数传入回调函数

CAUTION
Lambda函数不能作为回调函数传入,因为在异步任务执行时,Lambda函数的上下文会丢失。

你可能会注意到这里我们并没有为on_finish_callback传入其他参数,在绝大部分的情况下,我们可以通过future_id区分不同的任务,来获得任务的返回值,而不需要额外的参数。

当然,如果一定要传入其他参数,可以使用函数闭包生成一个新的函数,或者使用字符串版本的回调函数。

self.executor.submit_task(self.slow_async_task, args=[3]
    , on_finish_callback="chameleon_mini_async_example.some_callback(%, other_param='task3')")

TIP
"%" 会被替换为任务的future_id

3. 异步任务的执行

这部分的内容对于使用者来说是透明的,我们只需要关注任务的提交和回调函数的实现即可。如不需要了解内部实现细节,可以忽略这部分内容。

这个例子中,我们使用ThreadPoolExecutor来执行异步任务。 ThreadPoolExecutor是一个线程池,它可以管理多个线程,并且可以在需要时创建新的线程。文档: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

submit_task函数中,为Future添加完成时的回调函数future.add_done_callback(_func)时, 我们通过闭包,创建了一个新的临时函数_func,该函数将通过unreal.PythonBPLib.exec_python_command(cmd, force_game_thread=True) 执行submit_task中传入的回调函数的字符串版本。这部分的实现是Unreal Engine特有的。

    ...
    def _func(_future):
        unreal.PythonBPLib.exec_python_command(cmd, force_game_thread=True)

    future.add_done_callback(_func)
    ...

在这个例子中,self.on_task_finish将被转换成"chameleon_mini_async_example.on_task_finish"并通过exec_python_command在主线程中执行。如果传入的函数是其他的模块的函数或者静态函数,需要保证在全局范围内能够访问到这个函数。

如果对如何将callable类型的回调函数转换为字符串版本的回调函数感兴趣,可以查看ChameleonTaskExecutor.py中的get_cmd_str_from_callable函数。

TIP
除了ThreadPoolExecutor,用其他线程池或者进程池也是可以的。只要保证在任务完成时,回调函数能够被正确执行即可。

4. 完成时执行回调

由于前文提到的限制,我们不能在异步任务中直接访问Slate控件。因此,我们将对界面的修改操作放在回调函数中,这样我们就可以在主线程中安全地修改Slate控件。

def on_task_finish(self, future_id:int):
    # This function will be called in the main thread. so it's safe to modify Slate widget here.
    future = self.executor.get_future(future_id)
    if future is None:
        unreal.log_warning(f"Can't find future: {future_id}")
    else:
        self.data.set_text(self.ui_text_block, f"Task done, result: {future.result()}")
        if not self.executor.is_any_task_running():
            self.show_busy_icon(False)

    print(f"on_task_finish. Future: {future_id}, result: {future.result() if future else 'invalid'}")

Tips

哪些任务应该归为异步Slow Task

  • 对大文件,数据的读取
  • 网络请求
  • 其他非UE资源的操作

不应该放在异步Slow Task中的任务

  • 对UE资源的操作
  • 对Slate控件的操作

哪些代码应该放在回调

  • 任务完成的通知
  • 直接对界面的修改操作

参考链接