diff --git a/.gitmodules b/.gitmodules index 85783d7..913c306 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,60 +1,72 @@ [submodule "modules/core"] path = modules/core - url = https://github.com/vixcpp/core.git + url = git@github.com-vix:vixcpp/core.git branch = dev [submodule "modules/utils"] path = modules/utils - url = https://github.com/vixcpp/utils.git + url = git@github.com-vix:vixcpp/utils.git branch = dev [submodule "modules/cli"] path = modules/cli - url = https://github.com/vixcpp/cli.git + url = git@github.com-vix:vixcpp/cli.git branch = dev [submodule "modules/json"] path = modules/json - url = https://github.com/vixcpp/json.git + url = git@github.com-vix:vixcpp/json.git branch = dev [submodule "modules/orm"] path = modules/orm - url = https://github.com/vixcpp/orm.git + url = git@github.com-vix:vixcpp/orm.git branch = dev [submodule "modules/websocket"] - path = modules/websocket - url = https://github.com/vixcpp/websocket.git - branch = dev + path = modules/websocket + url = git@github.com-vix:vixcpp/websocket.git + branch = dev + [submodule "modules/middleware"] - path = modules/middleware - url = https://github.com/vixcpp/middleware.git - branch = dev + path = modules/middleware + url = git@github.com-vix:vixcpp/middleware.git + branch = dev + [submodule "modules/p2p"] - path = modules/p2p - url = git@github.com:vixcpp/p2p.git - branch = dev + path = modules/p2p + url = git@github.com-vix:vixcpp/p2p.git + branch = dev + [submodule "modules/net"] - path = modules/net - url = https://github.com/vixcpp/net.git - branch = dev + path = modules/net + url = git@github.com-vix:vixcpp/net.git + branch = dev + [submodule "modules/sync"] - path = modules/sync - url = https://github.com/vixcpp/sync.git - branch = dev + path = modules/sync + url = git@github.com-vix:vixcpp/sync.git + branch = dev + [submodule "modules/cache"] - path = modules/cache - url = https://github.com/vixcpp/cache.git - branch = dev + path = modules/cache + url = git@github.com-vix:vixcpp/cache.git + branch = dev + [submodule "modules/db"] - path = modules/db - url = git@github.com:vixcpp/db.git - branch = dev -[submodule "third_party/asio-src"] - path = third_party/asio-src - url = https://github.com/chriskohlhoff/asio.git + path = modules/db + url = git@github.com-vix:vixcpp/db.git + branch = dev + [submodule "modules/p2p_http"] - path = modules/p2p_http - url = https://github.com/vixcpp/p2p_http.git + path = modules/p2p_http + url = git@github.com-vix:vixcpp/p2p_http.git + branch = dev + +[submodule "third_party/asio-src"] + path = third_party/asio-src + url = git@github.com-vix:chriskohlhoff/asio.git +[submodule "modules/async"] + path = modules/async + url = git@github.com-vix:vixcpp/async.git branch = dev diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d639dc..4cdf5e1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,6 +128,7 @@ option(VIX_ENABLE_P2P "Build Vix P2P module" ON) option(VIX_ENABLE_P2P_HTTP "Build Vix P2P HTTP adapter module" ON) option(VIX_ENABLE_CACHE "Build Vix Cache module" ON) option(VIX_FETCH_DEPS "Allow fetching missing deps from the internet" OFF) +option(VIX_ENABLE_ASYNC "Build Vix Async module" ON) # ---------------------------------------------------- # Tooling / Static analysis @@ -394,6 +395,24 @@ else() message(FATAL_ERROR "Missing 'modules/core'. Run: git submodule update --init --recursive") endif() +# --- Async (optional) --- +set(VIX_HAS_ASYNC OFF) +if (VIX_ENABLE_ASYNC AND EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/modules/async/CMakeLists.txt") + message(STATUS "Adding 'modules/async'...") + add_subdirectory(modules/async async_build) + + if (TARGET vix::async OR TARGET vix_async) + set(VIX_HAS_ASYNC ON) + if (TARGET vix_async AND NOT TARGET vix::async) + add_library(vix::async ALIAS vix_async) + endif() + else() + message(WARNING "Async module added but no vix::async target was exported.") + endif() +else() + message(STATUS "Async: disabled or not present.") +endif() + # --- Net (required by P2P) --- set(VIX_HAS_NET OFF) if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/modules/net/CMakeLists.txt") @@ -619,6 +638,10 @@ if (TARGET vix::sync) target_link_libraries(vix INTERFACE vix::sync) endif() +if (TARGET vix::async) + target_link_libraries(vix INTERFACE vix::async) +endif() + if (TARGET vix::p2p) target_link_libraries(vix INTERFACE vix::p2p) endif() @@ -891,6 +914,11 @@ if (VIX_ENABLE_INSTALL) FILES_MATCHING PATTERN "*.hpp" PATTERN "*.h") endif() + if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/modules/async/include") + install(DIRECTORY modules/async/include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} + FILES_MATCHING PATTERN "*.hpp" PATTERN "*.h") + endif() + if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/modules/net/include") install(DIRECTORY modules/net/include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} FILES_MATCHING PATTERN "*.hpp" PATTERN "*.h") diff --git a/examples/async/00_hello_task.cpp b/examples/async/00_hello_task.cpp new file mode 100644 index 0000000..fa4f181 --- /dev/null +++ b/examples/async/00_hello_task.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +#include +#include +#include +#include + +using vix::async::core::io_context; +using vix::async::core::task; + +static task app(io_context &ctx) +{ + std::cout << "[async] hello from task\n"; + + // Timer: sleep for 50ms (does not block the event loop thread) + co_await ctx.timers().sleep_for(std::chrono::milliseconds(50)); + std::cout << "[async] after timer\n"; + + // Thread pool: run CPU work off the event loop, then resume here + int v = co_await ctx.cpu_pool().submit([]() -> int + { + // pretend CPU work + int sum = 0; + for (int i = 0; i < 100000; ++i) + sum += (i % 7); + return sum; }); + + std::cout << "[async] cpu_pool result = " << v << "\n"; + assert(v >= 0); + + // Stop the runtime once done + ctx.stop(); + co_return; +} + +int main() +{ + io_context ctx; + + // Kick off the app task by posting its coroutine handle to the scheduler. + // Our task starts suspended (initial_suspend = suspend_always), so we must resume it. + auto t = app(ctx); + ctx.post(t.handle()); + + // Run the event loop. It will stop when app() calls ctx.stop(). + ctx.run(); + + std::cout << "[async] done\n"; + return 0; +} diff --git a/examples/async/01_signal_stop.cpp b/examples/async/01_signal_stop.cpp new file mode 100644 index 0000000..c007373 --- /dev/null +++ b/examples/async/01_signal_stop.cpp @@ -0,0 +1,43 @@ +#include + +#include +#include +#include + +using vix::async::core::io_context; +using vix::async::core::task; + +static task app(io_context &ctx) +{ + auto &sig = ctx.signals(); + + // Register signals to handle + sig.add(SIGINT); + sig.add(SIGTERM); + + std::cout << "[async] waiting for SIGINT/SIGTERM (Ctrl+C)\n"; + + sig.on_signal([&](int s) + { + std::cout << "[async] signal received: " << s << " -> stopping\n"; + ctx.stop(); }); + + int s = co_await sig.async_wait(); + std::cout << "[async] async_wait got signal: " << s << " -> stopping\n"; + ctx.stop(); + + co_return; +} + +int main() +{ + io_context ctx; + + auto t = app(ctx); + ctx.post(t.handle()); + + ctx.run(); + + std::cout << "[async] stopped\n"; + return 0; +} diff --git a/examples/async/01_timer.cpp b/examples/async/01_timer.cpp new file mode 100644 index 0000000..8041a57 --- /dev/null +++ b/examples/async/01_timer.cpp @@ -0,0 +1,41 @@ +#include + +#include +#include +#include + +using vix::async::core::io_context; +using vix::async::core::task; + +static task app(io_context &ctx) +{ + std::cout << "[async] timer demo start\n"; + + co_await ctx.timers().sleep_for(std::chrono::milliseconds(100)); + std::cout << "[async] +100ms\n"; + + co_await ctx.timers().sleep_for(std::chrono::milliseconds(200)); + std::cout << "[async] +200ms\n"; + + // Fire-and-forget callback after 150ms + ctx.timers().after(std::chrono::milliseconds(150), [&]() + { std::cout << "[async] after(150ms) callback\n"; }); + + // Wait a bit so the callback can happen before we stop + co_await ctx.timers().sleep_for(std::chrono::milliseconds(250)); + + std::cout << "[async] timer demo done\n"; + ctx.stop(); + co_return; +} + +int main() +{ + io_context ctx; + + auto t = app(ctx); + ctx.post(t.handle()); + + ctx.run(); + return 0; +} diff --git a/examples/async/02_thread_pool.cpp b/examples/async/02_thread_pool.cpp new file mode 100644 index 0000000..f623ad0 --- /dev/null +++ b/examples/async/02_thread_pool.cpp @@ -0,0 +1,59 @@ +#include +#include +#include + +#include +#include +#include + +using vix::async::core::io_context; +using vix::async::core::task; + +static int heavy_work(int n) +{ + // Fake CPU work + int acc = 0; + for (int i = 0; i < 200000; ++i) + acc += (i * n) % 97; + return acc; +} + +static task app(io_context &ctx) +{ + std::cout << "[async] thread_pool demo start\n"; + + // Submit several CPU jobs in sequence (simple demo). + // Later we can add when_all / gather for parallel awaits. + int a = co_await ctx.cpu_pool().submit([] + { return heavy_work(1); }); + int b = co_await ctx.cpu_pool().submit([] + { return heavy_work(2); }); + int c = co_await ctx.cpu_pool().submit([] + { return heavy_work(3); }); + + std::cout << "[async] results: " << a << ", " << b << ", " << c << "\n"; + assert(a != 0 || b != 0 || c != 0); + + // Fire-and-forget job + ctx.cpu_pool().submit([] + { + // This runs on a worker thread + volatile int x = 0; + for (int i = 0; i < 100000; ++i) + x += i; }); + + std::cout << "[async] demo done\n"; + ctx.stop(); + co_return; +} + +int main() +{ + io_context ctx; + + auto t = app(ctx); + ctx.post(t.handle()); + + ctx.run(); + return 0; +} diff --git a/examples/async/03_tcp_echo_server.cpp b/examples/async/03_tcp_echo_server.cpp new file mode 100644 index 0000000..0b3233d --- /dev/null +++ b/examples/async/03_tcp_echo_server.cpp @@ -0,0 +1,101 @@ +#include +#include + +#include +#include +#include +#include + +#include + +using vix::async::core::io_context; +using vix::async::core::task; + +static task handle_client(std::unique_ptr client) +{ + std::cout << "[async] client connected\n"; + + std::vector buf(4096); + + while (client && client->is_open()) + { + std::size_t n = 0; + + try + { + n = co_await client->async_read( + std::span(buf.data(), buf.size())); + } + catch (const std::system_error &e) + { + std::cout << "[async] read error: " << e.code().message() << "\n"; + break; + } + + if (n == 0) + break; + + try + { + co_await client->async_write( + std::span(buf.data(), n)); + } + catch (const std::system_error &e) + { + std::cout << "[async] write error: " << e.code().message() << "\n"; + break; + } + } + + client->close(); + std::cout << "[async] client disconnected\n"; + co_return; +} + +static task server(io_context &ctx) +{ + auto &sig = ctx.signals(); + sig.add(SIGINT); + sig.add(SIGTERM); + sig.on_signal([&](int s) + { + std::cout << "[async] signal " << s << " received -> stopping\n"; + ctx.stop(); }); + + auto listener = vix::async::net::make_tcp_listener(ctx); + + co_await listener->async_listen({"0.0.0.0", 9090}, 128); + std::cout << "[async] echo server listening on 0.0.0.0:9090\n"; + + while (ctx.is_running()) + { + try + { + auto client = co_await listener->async_accept(); + vix::async::core::spawn_detached( + ctx, + handle_client(std::move(client))); + } + catch (const std::system_error &e) + { + std::cout << "[async] accept error: " << e.code().message() << "\n"; + break; + } + } + + listener->close(); + ctx.stop(); + co_return; +} + +int main() +{ + io_context ctx; + + auto t = server(ctx); + ctx.post(t.handle()); + + ctx.run(); + std::cout << "[async] server stopped\n"; + return 0; +} diff --git a/modules/async b/modules/async new file mode 160000 index 0000000..02f5bbe --- /dev/null +++ b/modules/async @@ -0,0 +1 @@ +Subproject commit 02f5bbe414176460392ef80730d877dc4aeb1fea