Replies: 2 comments 3 replies
-
|
Regarding the first question about the "window": this looks to me like you are looking for something like an "async semaphore". In libunifex there is the async_mutex that provides member functions async_lock() (returns a sender) and unlock(). https://github.com/facebookexperimental/libunifex/blob/main/include/unifex/v1/async_mutex.hpp Regarding the "deadlock" with run_loop. Are you aware that sync_wait provides internally a run_loop that you can use to schedule work on? |
Beta Was this translation helpful? Give feedback.
-
|
Hi @maikel, thank you very much for your reply, I really appreciate it. Yes I think an 'async semaphore' would be ideal, perhaps it might be possible for me to review the implementation of the async_lock in libunifex, and see if it's possible to recreate it in I was not aware that I guess I'm just also wondering at a higher level, if it's difficult to build this type of thing today without Thanks again, and I'll definitely take a look at what you mentioned. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi there,
I've recently been getting interested in
std::execution, and I've been trying to see if I can create implementations of algorithms I've used before in async code using the API provided bystd::execution. Unfortunately, I've been coming up short and was hoping I might be able to ask for clarification on the optimal/idiomatic way to create a sliding window of work.The situation I'd like to solve for is, say I have 100 requests (just to keep things simple), and a downstream service I do not want to overwhelm. The service might take a little bit of time to handle each request, and can only handle a fixed load, so I'd like to send an initial batch of 10 requests immediately, and then wait for a response to come back, and as each response is returned, I then want to send another request immediately, creating a sliding window effect where there's never more than 10 requests in flight at once.
This is the first part of the problem, the next part, building on top of the first, would be to store the responses as they come back in an associative container, keyed by the response index. The reason for this is we want to take the responses, and return them in order to an upstream service, but as our responses might come back out of order, we need to store them in an associative container, and then have a separate thread that has an index/counter which it used to lookup if a response has come back in slot 0, then slot 1, 2... etc... This allows the results to be streamed back in order (in a simple example this streaming back could just be to a file that needs to have the responses recorded in the same order the requests were made).
The last thing to mention, is before we write the results to the map, we need to do some further processing on the response to get it into the right shape. This work is CPU bound, and to stop blocking the I/O thread, we have a separate thread pool to take the responses, do some processing, and then write them to the associative container before the results are picked up in order and streamed out (to a file or another service).
Now to do this the old-school way, I'd send a bunch of async requests with callbacks, as each callback returns, I spawn a worker thread, and move the response data to it so the processing can happen, and then have a mutex around an associative container (probably an
unordered_mapor equivalent for simplicity), and each worker thread would write to it when building the result is done. There'd then be a separate thread that would handle streaming the results back, which would be woken up on a condition variable each time a result is written to the associative container to see if it's the index it's waiting on. There might be a smarter/better/simpler way to do this, but that's that basic pattern.Now if I want to do the equivalent with senders/receivers (
std::execution), I think I'd need a combination ofcounting_scope,run_loopand some different schedulers. I have cobbled something together which looks something like this (I'm sure hopeless, but just a sketch/outline/idea):{ exec::static_thread_pool cpu_pool(std::thread::hardware_concurrency()); auto cpu_sch = cpu_pool.get_scheduler(); stdexec::counting_scope scope; stdexec::run_loop loop; auto main_sch = loop.get_scheduler(); struct payload_t { float f; // some data... }; struct response_t { int index; }; struct data_t { int index; payload_t payload; }; std::unordered_map<int, data_t> results; std::ofstream out{"out.txt"}; const int request_count = 100; int out_index = 0; std::atomic<int> active = 0; std::counting_semaphore sem(10); for (int i = 0; i < request_count; i++) { stdexec::spawn( stdexec::just() | stdexec::then([&sem, &active, i]() noexcept { sem.acquire(); active++; std::println( "task {} launching - active {}, thread {}", i, active.load(), std::this_thread::get_id()); return response_t{.index = i}; }) | stdexec::continues_on(cpu_sch) | stdexec::then([&sem, &active](const response_t& response) noexcept { std::println( "task {} running - active {}, thread {}", response.index, active.load(), std::this_thread::get_id()); sem.release(); active--; // do some processing... return data_t{ .index = response.index, .payload = {.f = static_cast<float>(response.index)}}; }) | stdexec::continues_on(main_sch) | stdexec::then( [&results, &out_index, &out](const data_t& data) noexcept { std::println( "appending results: {}, thread {}", data.index, std::this_thread::get_id()); results[data.index] = data; while (out_index < request_count) { if ( auto ordered_it = results.find(out_index); ordered_it != results.end()) { out_index++; out << ordered_it->second.payload.f + 1.0f << std::endl; } else { break; } } }), scope.get_token()); } // deadlocks loop.run(); stdexec::sync_wait(scope.join()); }With this code I do get everything written in the correct order in
out.txt(as I'm usingstd::endlto flush each write to the output), but this deadlocks with therun_looplogic right now, and if I try and if I try to do something like this...Then I get a very long compile error, complaining about
error: no matching function for call to object of type 'stdexec::set_error_t'(which I'm not sure exactly how to solve, I've tried adding an extra error handler but that doesn't seem to have helped).I also think the use of a
semaphorehere isn't great, as then I'm totally blocking a thread which could be doing something else useful when waiting for one of the 10 requests to come back (this could be solved by using a coroutine potentially, but I was thinking it should be possible to implement with just senders/receivers, though perhaps I'm mistaken on this).I would be incredibly grateful and interested if someone could share with me a better/valid approach that would implement roughly what I'm thinking of. My understanding with
std::executionis that all the primitives available today, should allow you to build any async computation, even if a friendlier API doesn't yet exist for it. This might be the case in my example, and if it fundamentally relies on a lower level building block, maybe it's not so simple, but I'd love to know if something like this is recommended/encouraged vs using raw threads.Thank you very much for your time and I look forward to a response in some shape or form (sorry for such a long question, there's no obligation or rush to reply, I just thought I'd try my luck - if there are existing resources such as articles/talks/repos that would be worth looking at to better understand this by all means point me at those instead, thank you!).
Beta Was this translation helpful? Give feedback.
All reactions