Azure持久功能:处理列表
问题内容
我有一个用 python 编写的 azure 持久函数,带有一个协调器和两个活动函数
orchestrator 调用第一个活动函数,并作为回报接收一个列表变量(名称列表和此列表可以在每次执行函数时都是动态的)
下一步是为每个列表项调用第二个活动函数(顺序处理 - 由于第二个活动函数调用的 api 限制)
#dynamically gets generated by the first activity functionpayload=[1,2,3,4] tasks = [context.call_activity("secondfunction",ps) for ps in payload]output = yield context.task_all(tasks)
我在扇出方法中使用的不是串行的,但我似乎无法找到我想要做的事情的替代方法。
此外,在 host.json 文件中,我尝试强制在给定时间只能运行一个活动函数,以避免并行处理
"extensions": { "durableTask": { "maxConcurrentActivityFunctions": 1, "maxConcurrentOrchestratorFunctions": 1 } }
还值得注意的是,我无法将整个列表传递给活动函数,就好像我执行活动函数将花费超过 5-10 分钟,这是 azure 函数的超时限制,因此尝试迭代列表编排功能
但结果不是连续的
非常感谢您的反馈
正确答案
您可以尝试使用以下两种方法来实现您的要求:-
方法 1:-
我的function_app.py:-
import azure.functions as funcimport azure.durable_functions as dfmyapp = df.dfapp(http_auth_level=func.authlevel.anonymous)# http starter@myapp.route(route="orchestrators/{functionname}")@myapp.durable_client_input(client_name="client")async def http_start(req: func.httprequest, client): function_name = req.route_params.get('functionname') instance_id = await client.start_new(function_name, none) # pass the functionname here response = client.create_check_status_response(req, instance_id) return response# orchestrator@myapp.orchestration_trigger(context_name="context")def hello_orchestrator(context): cities = ["seattle", "tokyo", "london"] tasks = [] for city in cities: tasks.append(context.call_activity("hello", city)) # wait for all tasks to complete results = yield context.task_all(tasks) return results# activity@myapp.activity_trigger(input_name="city")def hello(city: str): print(f"processing {city}...") # your activity function logic goes here result = f"hello {city}!" return result
输出:-
函数 url:-
http://localhost:7071/api/orchestrators/hello_orchestrator
方法 2:-
function_app.py:-
import azure.functions as funcimport azure.durable_functions as dfmyApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)# HTTP Starter@myApp.route(route="orchestrators/{functionName}")@myApp.durable_client_input(client_name="client")async def http_start(req: func.HttpRequest, client): function_name = req.route_params.get('functionName') instance_id = await client.start_new(function_name, None) # Pass the functionName here response = client.create_check_status_response(req, instance_id) return response# Orchestrator@myApp.orchestration_trigger(context_name="context")def hello_orchestrator(context): # Call the first activity to get a list of names names_list = yield context.call_activity("get_names") # Process each name sequentially using the second activity results = [] for name in names_list: result = yield context.call_activity("process_name", name) results.append(result) return results# First Activity@myApp.activity_triggerdef get_names(): # Your logic to retrieve a dynamic list of names goes here # For demonstration purposes, returning a hardcoded list return ["John", "Alice", "Bob"]# Second Activity@myApp.activity_trigger(input_name="name")def process_name(name: str): print(f"Processing {name}...") # Your logic to process each name goes here result = f"Hello {name}!" return result