Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions examples/async/00_hello_task.cpp
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
#include <cassert>
#include <iostream>
#include <thread>

#include <chrono>
#include <vix/console.hpp>
#include <vix/async/core/io_context.hpp>
#include <vix/async/core/task.hpp>
#include <vix/async/core/timer.hpp>
#include <vix/async/core/thread_pool.hpp>

using namespace vix;
using vix::async::core::io_context;
using vix::async::core::task;

static task<void> app(io_context &ctx)
{
std::cout << "[async] hello from task\n";

console.info("[async] hello from task");
// Timer: sleep for 50ms (does not block the event loop thread)
co_await ctx.timers().sleep_for(std::chrono::milliseconds(50));
std::cout << "[async] after timer\n";
console.info("[async] after timer");

// Thread pool: run CPU work off the event loop, then resume here
int v = co_await ctx.cpu_pool().submit([]() -> int
{
int v = co_await ctx.cpu_pool().submit([]() -> int{
// pretend CPU work
int sum = 0;
for (int i = 0; i < 100000; ++i)
sum += (i % 7);
return sum; });
return sum;
});

std::cout << "[async] cpu_pool result = " << v << "\n";
console.info("[async] cpu_pool result =", v);
assert(v >= 0);

// Stop the runtime once done
ctx.stop();
co_return;
Expand All @@ -38,15 +35,10 @@ static task<void> app(io_context &ctx)
int main()
{
io_context ctx;

// Kick off the app task by posting its coroutine handle to the scheduler.
// Our task starts suspended (initial_suspend = suspend_always), so we must resume it.
auto t = app(ctx);
ctx.post(t.handle());

// Run the event loop. It will stop when app() calls ctx.stop().
ctx.run();

std::cout << "[async] done\n";
vix::console.info("[async] done");
return 0;
}
12 changes: 7 additions & 5 deletions examples/async/01_signal_stop.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <iostream>
#include <csignal>

#include <vix/console.hpp>

#include <vix/async/core/io_context.hpp>
#include <vix/async/core/signal.hpp>
Expand All @@ -15,15 +17,15 @@ static task<void> app(io_context &ctx)
sig.add(SIGINT);
sig.add(SIGTERM);

std::cout << "[async] waiting for SIGINT/SIGTERM (Ctrl+C)\n";
vix::console.info("[async] waiting for SIGINT/SIGTERM (Ctrl+C)");

sig.on_signal([&](int s)
{
std::cout << "[async] signal received: " << s << " -> stopping\n";
vix::console.warn("[async] signal received:", s, "-> stopping");
ctx.stop(); });

int s = co_await sig.async_wait();
std::cout << "[async] async_wait got signal: " << s << " -> stopping\n";
vix::console.warn("[async] async_wait got signal:", s, "-> stopping");
ctx.stop();

co_return;
Expand All @@ -38,6 +40,6 @@ int main()

ctx.run();

std::cout << "[async] stopped\n";
vix::console.info("[async] stopped");
return 0;
}
18 changes: 9 additions & 9 deletions examples/async/01_timer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <iostream>
#include <chrono>

#include <vix/console.hpp>

#include <vix/async/core/io_context.hpp>
#include <vix/async/core/task.hpp>
Expand All @@ -9,22 +11,20 @@ using vix::async::core::task;

static task<void> app(io_context &ctx)
{
std::cout << "[async] timer demo start\n";
vix::console.info("[async] timer demo start");

co_await ctx.timers().sleep_for(std::chrono::milliseconds(100));
std::cout << "[async] +100ms\n";
vix::console.info("[async] +100ms");

co_await ctx.timers().sleep_for(std::chrono::milliseconds(200));
std::cout << "[async] +200ms\n";
vix::console.info("[async] +200ms");

// Fire-and-forget callback after 150ms
ctx.timers().after(std::chrono::milliseconds(150), [&]()
{ std::cout << "[async] after(150ms) callback\n"; });
ctx.timers().after(std::chrono::milliseconds(150), []()
{ vix::console.info("[async] after(150ms) callback"); });

// Wait a bit so the callback can happen before we stop
co_await ctx.timers().sleep_for(std::chrono::milliseconds(250));
vix::console.info("[async] timer demo done");

std::cout << "[async] timer demo done\n";
ctx.stop();
co_return;
}
Expand Down
9 changes: 5 additions & 4 deletions examples/async/02_thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include <cassert>
#include <iostream>
#include <vector>

#include <vix/console.hpp>

#include <vix/async/core/io_context.hpp>
#include <vix/async/core/task.hpp>
#include <vix/async/core/thread_pool.hpp>
Expand All @@ -20,7 +21,7 @@ static int heavy_work(int n)

static task<void> app(io_context &ctx)
{
std::cout << "[async] thread_pool demo start\n";
vix::console.info("[async] thread_pool demo start");

// Submit several CPU jobs in sequence (simple demo).
// Later we can add when_all / gather for parallel awaits.
Expand All @@ -31,7 +32,7 @@ static task<void> app(io_context &ctx)
int c = co_await ctx.cpu_pool().submit([]
{ return heavy_work(3); });

std::cout << "[async] results: " << a << ", " << b << ", " << c << "\n";
vix::console.info("[async] results:", a, b, c);
assert(a != 0 || b != 0 || c != 0);

// Fire-and-forget job
Expand All @@ -42,7 +43,7 @@ static task<void> app(io_context &ctx)
for (int i = 0; i < 100000; ++i)
x += i; });

std::cout << "[async] demo done\n";
vix::console.info("[async] demo done");
ctx.stop();
co_return;
}
Expand Down
32 changes: 16 additions & 16 deletions examples/async/03_tcp_echo_server.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <iostream>
#include <csignal>
#include <system_error>
#include <vector>

#include <vix/console.hpp>

#include <vix/async/core/io_context.hpp>
#include <vix/async/core/task.hpp>
#include <vix/async/core/signal.hpp>
Expand All @@ -13,7 +16,7 @@ using vix::async::core::task;

static task<void> handle_client(std::unique_ptr<vix::async::net::tcp_stream> client)
{
std::cout << "[async] client connected\n";
vix::console.info("[async] client connected");

std::vector<std::byte> buf(4096);

Expand All @@ -23,12 +26,11 @@ static task<void> handle_client(std::unique_ptr<vix::async::net::tcp_stream> cli

try
{
n = co_await client->async_read(
std::span<std::byte>(buf.data(), buf.size()));
n = co_await client->async_read(std::span<std::byte>(buf.data(), buf.size()));
}
catch (const std::system_error &e)
{
std::cout << "[async] read error: " << e.code().message() << "\n";
vix::console.error("[async] read error:", e.code().message());
break;
}

Expand All @@ -37,18 +39,17 @@ static task<void> handle_client(std::unique_ptr<vix::async::net::tcp_stream> cli

try
{
co_await client->async_write(
std::span<const std::byte>(buf.data(), n));
co_await client->async_write(std::span<const std::byte>(buf.data(), n));
}
catch (const std::system_error &e)
{
std::cout << "[async] write error: " << e.code().message() << "\n";
vix::console.error("[async] write error:", e.code().message());
break;
}
}

client->close();
std::cout << "[async] client disconnected\n";
vix::console.info("[async] client disconnected");
co_return;
}

Expand All @@ -57,28 +58,27 @@ static task<void> server(io_context &ctx)
auto &sig = ctx.signals();
sig.add(SIGINT);
sig.add(SIGTERM);

sig.on_signal([&](int s)
{
std::cout << "[async] signal " << s << " received -> stopping\n";
vix::console.warn("[async] signal", s, "received -> stopping");
ctx.stop(); });

auto listener = vix::async::net::make_tcp_listener(ctx);

co_await listener->async_listen({"0.0.0.0", 9090}, 128);
std::cout << "[async] echo server listening on 0.0.0.0:9090\n";
vix::console.info("[async] echo server listening on 0.0.0.0:9090");

while (ctx.is_running())
{
try
{
auto client = co_await listener->async_accept();
vix::async::core::spawn_detached(
ctx,
handle_client(std::move(client)));
vix::async::core::spawn_detached(ctx, handle_client(std::move(client)));
}
catch (const std::system_error &e)
{
std::cout << "[async] accept error: " << e.code().message() << "\n";
vix::console.error("[async] accept error:", e.code().message());
break;
}
}
Expand All @@ -96,6 +96,6 @@ int main()
ctx.post(t.handle());

ctx.run();
std::cout << "[async] server stopped\n";
vix::console.info("[async] server stopped");
return 0;
}
Loading