问题¶
在我之前的chameleon_stable_diffusion工具中,会在界面打开的第一时间尝试连接服务器,用于拉取可用的AI模型名称,并把它们显示在界面上。当网络和服务器正常的时候,界面打开还算流畅(0.1~0.2秒),但当服务器异常的时候,体验就变得很糟糕:整个编辑器会卡顿数秒钟,直到python抛出网络连接超时的异常消息为止。
如果你做的Chameleon工具在打开时需要读取大量的数据,或者需要进行网络请求,那么也很有可能会遇到类似的问题。这篇Blog和下面的例子,将会帮助你解决这个问题。
简要步骤¶
- 工具实例的
__init__
函数中,创建一个ChameleonTaskExecutor
实例,用于提交异步任务 - 在Chameleon工具的json的
InitPyCmd
创建完工具实例后,执行some_slow_tasks
,模拟耗时任务 some_slow_tasks
中将两个异步任务提交给executor
,并指定任务完成时的回调函数- 回调函数中通过future_id获取任务的返回值,并更新界面
这个例子可以在TAPython_DefaultResources中找到,并会包含在后续版本的TAPython中
Sequence Diagram:
具体介绍¶
前置条件¶
虚幻引擎强制限制了在除了主线程之外的线程中修改Slate小部件的操作,因此我们在非主线程中直接修改Slate控件是不被允许的。
NOTE
SlateGlobals.h: #define SLATE_CROSS_THREAD_CHECK() checkf(IsInGameThread() || IsInSlateThread(), TEXT("Access to Slate is restricted to the GameThread or the SlateLoadingThread!"));
由于有这个限制,我们的现在的目标就变成了两个:
- 在非主线程中执行耗时任务
- 在完成耗时任务后,通过回调函数在主线程中更新Slate控件
在这个例子中MinimalAsyncTaskExample,当我们打开界面的时候,会执行两个"Slow Task",模拟我们读取文件或网络请求等情况。当任务完成时,我们的界面会更新,并将结果显示出来。
来看看具体时怎么做的:
1. 启动时调用“Slow task”¶
{
"TabLabel": "Chameleon Async Example",
"InitTabSize": [350, 186],
"InitTabPosition": [800, 100],
"InitPyCmd": "import Example; chameleon_mini_async_example = Example.MinimalAsyncTaskExample.MinimalAsyncTaskExample(%JsonPath); chameleon_mini_async_example.some_slow_tasks()",
"Root":{
...
}
}
在这里可以看到,在常规启动工具的代码chameleon_mini_async_example = Example.MinimalAsyncTaskExample.MinimalAsyncTaskExample(%JsonPath)
之后,我们执行了chameleon_mini_async_example.some_slow_tasks()
。
注意,我们是在json文件中通过工具的实例变量chameleon_mini_async_example
调用的some_slow_tasks()
,而不是在工具的构造函数__init__
中执行。这能保证我们后续代码some_slow_tasks
执行时,工具和界面已经完成了创建,并且能够通过全局变量访问到该实例。
NOTE
由于我们使用单例模式,所以我们在多次打开工具的时候,Chameleon工具只会执行一次__inti__方法。如果我们在InitCmd中通过importlib.reload去重新加载模块,那么Chameleon工具会重新执行__init__方法。
当然,我们也可以在工具执行的其他阶段提交异步任务。例如点击按钮后,发送网络请求(比如Stable-diffusion)等慢任务。这个例子中的按钮就模拟了这种情况。
2. 提交异步任务¶
在some_slow_tasks
中, 我们添加了两次异步任务self.slow_async_task
, 并在两次调用时传入了不同的参数。在这两个任务结束之后,回调函数self.on_task_finish
分别会被调用。
def some_slow_tasks(self):
self.show_busy_icon(True)
self.executor.submit_task(self.slow_async_task, args=[2], on_finish_callback=self.on_task_finish)
self.executor.submit_task(self.slow_async_task, args=[3], on_finish_callback=self.on_task_finish)
TIP
不同异步任务的执行顺序是不确定的
在这里,我们通过一个executor
来提交异步任务。它负责管理异步任务的执行,以及在任务完成时执行回调函数。
在具体实现中,我们使用了concurrent.futures
库中的ThreadPoolExecutor
来实现异步任务的提交和执行。
def submit_task(self, task:Callable, args=None, kwargs=None, on_finish_callback: Union[Callable, str] = None)-> int:
在submit_task
中,我们可以传入一个任务函数task
,以及任务函数的参数args
和kwargs
。我们还可以传入一个回调函数on_finish_callback
,在任务完成时会被调用。
on_finish_callback可以是一个函数或者一个字符串。当这个函数有参数时,我们会将任务future_id
值作为第一个参数传入回调函数
CAUTION
Lambda函数不能作为回调函数传入,因为在异步任务执行时,Lambda函数的上下文会丢失。
你可能会注意到这里我们并没有为on_finish_callback传入其他参数,在绝大部分的情况下,我们可以通过future_id
区分不同的任务,来获得任务的返回值,而不需要额外的参数。
当然,如果一定要传入其他参数,可以使用函数闭包生成一个新的函数,或者使用字符串版本的回调函数。
self.executor.submit_task(self.slow_async_task, args=[3]
, on_finish_callback="chameleon_mini_async_example.some_callback(%, other_param='task3')")
TIP
"%" 会被替换为任务的future_id
值
3. 异步任务的执行¶
这部分的内容对于使用者来说是透明的,我们只需要关注任务的提交和回调函数的实现即可。如不需要了解内部实现细节,可以忽略这部分内容。
这个例子中,我们使用ThreadPoolExecutor来执行异步任务。 ThreadPoolExecutor是一个线程池,它可以管理多个线程,并且可以在需要时创建新的线程。文档: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
在submit_task
函数中,为Future添加完成时的回调函数future.add_done_callback(_func)
时, 我们通过闭包,创建了一个新的临时函数_func
,该函数将通过unreal.PythonBPLib.exec_python_command(cmd, force_game_thread=True)
执行submit_task中传入的回调函数的字符串版本。这部分的实现是Unreal Engine特有的。
...
def _func(_future):
unreal.PythonBPLib.exec_python_command(cmd, force_game_thread=True)
future.add_done_callback(_func)
...
在这个例子中,self.on_task_finish
将被转换成"chameleon_mini_async_example.on_task_finish"
并通过exec_python_command
在主线程中执行。如果传入的函数是其他的模块的函数或者静态函数,需要保证在全局范围内能够访问到这个函数。
如果对如何将callable
类型的回调函数转换为字符串版本的回调函数感兴趣,可以查看ChameleonTaskExecutor.py
中的get_cmd_str_from_callable
函数。
TIP
除了ThreadPoolExecutor,用其他线程池或者进程池也是可以的。只要保证在任务完成时,回调函数能够被正确执行即可。
4. 完成时执行回调¶
由于前文提到的限制,我们不能在异步任务中直接访问Slate控件。因此,我们将对界面的修改操作放在回调函数中,这样我们就可以在主线程中安全地修改Slate控件。
def on_task_finish(self, future_id:int):
# This function will be called in the main thread. so it's safe to modify Slate widget here.
future = self.executor.get_future(future_id)
if future is None:
unreal.log_warning(f"Can't find future: {future_id}")
else:
self.data.set_text(self.ui_text_block, f"Task done, result: {future.result()}")
if not self.executor.is_any_task_running():
self.show_busy_icon(False)
print(f"on_task_finish. Future: {future_id}, result: {future.result() if future else 'invalid'}")
Tips¶
哪些任务应该归为异步Slow Task¶
- 对大文件,数据的读取
- 网络请求
- 其他非UE资源的操作
不应该放在异步Slow Task中的任务¶
- 对UE资源的操作
- 对Slate控件的操作
哪些代码应该放在回调¶
- 任务完成的通知
- 直接对界面的修改操作