Skip to content

Async PostgreSQL API#3404

Merged
itowlson merged 1 commit intospinframework:mainfrom
itowlson:pg-async-streaming
Mar 9, 2026
Merged

Async PostgreSQL API#3404
itowlson merged 1 commit intospinframework:mainfrom
itowlson:pg-async-streaming

Conversation

@itowlson
Copy link
Collaborator

The eventual intent is to asyncify all Spin APIs that involve I/O (which is, I think, all of them). I'm not expecting to learn all my lessons from this one step, but I'm putting it up for review so I can learn at least some lessons. Then I can make only new mistakes on the next tranche APIs.

(Tested manually: I think automated tests have to be done via conformance-tests, so may have to be deferred.)

@itowlson itowlson marked this pull request as draft February 19, 2026 00:32

if finish {
return Poll::Ready(Ok(StreamResult::Cancelled));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit subtle, but this'll want to go right before Poll::Pending is returned below. The general idea is that this means that the guest has cancelled a read so the host does whatever it can to try to complete the read, but if it can't then in the end it returns that it's cancelled instead of pending. Here the implementatnio is pretty simple wehre there's no buffering so cancellation doesn't need any further handling though, so just need to move this into the Poll::Pending case below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a built-in impl of FutureProducer for F: Future, so that might be best to use instead (requires wrapping a future's result in a wasmtime::Result<T>)

Ok(r) => r,
Err(e) => {
let err = query_failed(e);
rows_tx.send(Err(err)).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .unwrap() here (and below in other sends) will want to be handled gracefully to break out of this loop. An error here means that the guest closed the stream without reading all of the results, which is expected to be a normal occurrence, so all the unwraps here I think can just be break

Comment on lines +154 to +159
/// Open a connection to the Postgres instance at `address`.
open: static func(address: string) -> result<connection, error>;

/// Open a connection to the Postgres instance at `address`.
@since(version = 4.2.0)
open-async: static async func(address: string) -> result<connection, error>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old open function is compat with 4.0.0? And/or exploration of sync/async bindings signatures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you're asking here. The open function is the one from 4.0.0: it hasn't changed. What I've (tentatively) done is add a separate open-async, which has the same behaviour but is async. Are you politely hinting that I have chosen... poorly? If so, could you elaborate? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, that makes sense. Basically I was curious why there's two functions here vs just having the async func one, but backwards-compat is a solid reason!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to consider here might be a separate resource connection-async which then itself doesn't need backward-compatible methods. 🤷

Copy link
Contributor

@dicej dicej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; thanks for working on this!

Test-wise, you could follow this example of defining a test component that uses the new postgres API, then add a test here which uses it, then make sure you have Postgres running locally, and finally run it locally using e.g. cargo build --release && RUST_LOG=info cargo run --manifest-path tests/runtime-tests/Cargo.toml --no-default-features -- $(pwd)/target/release/spin. I don't know offhand what else is needed to ensure that test runs in CI, but that would at least be a start.

let Some(row) = stm.next().await else {
break;
};
// TODO: figure out how to deal with errors here - I think there is like a FutureReader<Error> pattern?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you're doing here looks reasonable to me, FWIW. Not sure we still need a TODO comment here.

The future<result<_, error>> thing you're thinking of is a thing, and I'll discuss it in another comment, but I don't think it applies to this level of abstraction.

rx: tokio::sync::oneshot::Receiver<T>,
}

impl<D, T: 'static + Send> wasmtime::component::FutureProducer<D> for FutureProducer<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious if this could be reused instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that works very nicely! I can construct the FutureReader off the rx directly rather than having a custom producer in the way. Thanks to you and Alex: I should have had more faith!


/// Query the database.
@since(version = 4.2.0)
query-async: async func(statement: string, params: list<parameter-value>) -> result<tuple<future<list<column>>, stream<result<row, error>>>, error>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current best practice for fallible streams is to return e.g. tuple<stream<row>, future<result<_, error>> such that the sending side will close the stream early on error and write the error to the future. That emulates a planned stream<T, E> type in a future edition of the component model.

The idea is that you might want to forward your stream<row> elsewhere but possibly change the error type by mapping the future to some other type. In this case it probably doesn't matter much, and the return type here is already pretty hairy, so I don't think it necessarily needs to change, but I wanted to mention it anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My fear with that was that the stream would end and the user would go "well, that's all folks" rather than going "ooh now I better check this other thing to make sure that was a 'normal' end rather than an error." Making it so that stream next() produced an honest-to-goodness error seemed safer. I guess this could be worked around in bindings though.

I think for now I'm inclined to leave it as is, but definitely happy to take further guidance on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's somewhat related discussion that happened here, but for WASI we ended up settling on:

  • Instead of result<(..., future<result<(), E>>), E> to instead only return (..., future<result<(), E>>). Basically sacrifice the immediate-ness of the outer result and only return a stream/future pair.
  • While you're right it's a bit more difficult to use, to use (stream<T>, future<result<(), E>>) instead of stream<result<...>>.
  • Here I'd recommend dropping the async part of async func. That way this function would return a stream/future immediately and those would be resolved in the background.

Personally I'd say to stick to the WASI principles here since whatever ends up done for WASI will be equally applicable here. The rough theory as well is that raw usage of these bindings will be somewhat rare and instead will be done through some form of library or similar, which may help mitigate the has-a-footgun property

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the WASI conventions sounds good. I will rework. Thank you both for the guidance and discussion!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexcrichton Okay I have been looking at this and I'm afraid I'm struggling. Here is what I am currently trying:

query-async: async func(
  statement: string,
  params: list<parameter-value>)
-> tuple<
  future<list<column>>,
  stream<row>,
  future<result<_, error>>
>;

Now in my test harness the first thing I do is print out the number of columns:

let (cols, mut rows, err) = conn.query_async("select j from jtest".into(), vec![]).await;

let cols = cols.await;
println!("THERE'S {} COLS", cols.len());

// then go on to process `rows`

But... if an error occurs, cols will never resolve. And I can't err.await first because that won't resolve until after all rows have been streamed. I guess I could select on the cols and err futures but egad that feels complicated and heavyweight for what was previously a .await? on the query.

Is that your intention, or am I misunderstanding how the WASI folks see this working? If I've misunderstood, could you provide a little bit more guidance please on the intended signature (and usage if not obvious)? Thanks (and sorry)!

(I realise that you reckon the function should be synchronous but it wasn't obvious to me how to do that with the current Spin plumbing so I have punted on that for now. But I will badger someone about it later come back to it later - I am sure this is a Spin factors + Ivan ignorance thing.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where I landed in the guest:

let (cols, mut rows, err) = conn.query_async("select j from jtest".into(), vec![]).await;

let mut cols = std::pin::pin!(cols.into_future());
let mut err = std::pin::pin!(err.into_future());

let selorama = futures::future::select(&mut cols, &mut err).await;

let cols = match selorama {
    futures::future::Either::Left((cols, _err)) => cols,
    futures::future::Either::Right((err, cols)) => match err {
        Ok(_) => cols.await, // or panic/bail? this seems shouldn't-happen
        Err(e) => anyhow::bail!(e),
    }
};

println!("THERE'S {} COLS", cols.len());

I dunno if this is anywhere close to what is envisaged. But I guess it could be encapsulated in a sufficiently beefy wrapper library if so. Row iteration is definitely nicer - there is a check of err after the stream ends but otherwise simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok no yeah this is subtly, but significantly so, different from WASI where there's an additional initial state of columns before the subsequent transmission of rows. WASI things so far have all been "a long list of T followed by an error" where this is "a single T, then a bunch of U, then maybe an error".

For WASI idiom-wise the intent is that things stop resolving (e.g. the stream of T) which is a signal to go take a look at the error. I agree this won't work if the columns never resolve since the way this should, in theory, work is that the guest waits for columns, then a bunch of rows, and if at any point anything stops the error is consluted. Basically select shouldn't be required here, or else IMO it's not the right API to expose to the guest.

Would it be possible to have the columns get resolved with an empty list if an error happens? Or some other sort of sentinel value? That way if columns had an error it would mean that it'd resolve with nothing, rows wouldn't resolve with anything, and the guest could check the error and see what happened.

Another possible signature, if quite gnarly, could be:

query-async: async func(...) -> result<
  tuple<list<column>, stream<row>, future<result<_, error>>>, 
  error,
>;

where here the double-async-ness or double-error-ness goes back closer to what you had originally (sorry if this is whiplash) but is intentional to indicate that fetching the columns is the first layer of result and the second layer or results is the row-and-its-error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I could make columns resolve to an option, with None being the "go look at the error" indicator, but... The double-async one does have the restriction that the host has to resolve columns before rows, but in this case that isn't a problem. None of these feel very lovely eh! I'll have a tinker, but so far double-async seems the most obvious.

@itowlson itowlson force-pushed the pg-async-streaming branch 2 times, most recently from 286455c to 96c06a8 Compare February 27, 2026 00:30
@itowlson
Copy link
Collaborator Author

itowlson commented Mar 1, 2026

I am not sure what we want to do re limiting host memory usage. For now I have checked the size of each row, and limited the channel to 4 items. I guess I could limit it to 1 if that would help? Another thought I explored was custom sender-receiver types that kept a running total of bytes sent minus bytes read, but I was not sure that was worth pursuing (plus I kind of thought I should make it do backpressure instead of blowing up the entire query and I wasn't sure how to do that). I'd value feedback and guidance on the best approach here!

@dicej
Copy link
Contributor

dicej commented Mar 2, 2026

I don't think we need to get fancy when limiting host memory usage. The comprehensive solution will be to move this code into a component (which exports the postgres API and imports wasi-sockets) so that the allocations become guest allocations instead of host ones, but we're not in a position to do that yet. What you're doing now seems fine to me, meanwhile.

@itowlson
Copy link
Collaborator Author

itowlson commented Mar 3, 2026

More fiddling: now that our query also uses Postgres query_raw, there was a lot of duplication across query and query_async. I had a go in this commit: 69225b3 at unifying them, doing all the row conversion and column inferring and all that in one place and... it seems to work. (I've verified by adding delays that columns do not get blocked on the rows in the async case.)

However, the way I'm making the columns into a future (using poll_fn) and defaulting them if the stream never populates them (using chain and unfold) seems wildly ugly and rather artificial. So if anyone knows better ways then I would value the education!

(edit: commit hash: had to rebase)

@itowlson itowlson force-pushed the pg-async-streaming branch from 02b7b84 to 69225b3 Compare March 3, 2026 02:35
@itowlson itowlson marked this pull request as ready for review March 4, 2026 01:18
@itowlson itowlson force-pushed the pg-async-streaming branch from 69225b3 to f99c821 Compare March 8, 2026 19:42
return std::task::Poll::Ready(cs.clone());
}
};
cx.waker().wake_by_ref();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waking the waker here says "poll me again ASAP", which seems wrong. Rather than use an Arc<RwLock<Option<Vec<_>>>> and poll_fn here, I'd suggest using a oneshot channel. And instead of StreamExt::chain, map the receive end of the oneshot channel to the default value, e.g.:

receiver.map(|result| result.unwrap_or_default())

...which should produce a vec![] if the sender is dropped without having sent anything.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - I think I found something like that for SQLite and should have come back here to check if I could apply it here! Thanks (and thanks for the receiver.map tip - very nice)!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, not quite as simple as SQLite because of different timing of when things arrive, but that has still simplified it enormously. Thank you Joel - this is ready for another look if you have bandwidth.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks great now!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do another full pass this afternoon.

@itowlson itowlson force-pushed the pg-async-streaming branch from f99c821 to bb66447 Compare March 9, 2026 19:35
Copy link
Contributor

@dicej dicej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; thanks for working on this!

I think adding some test coverage here is do-able per my earlier review comment. Happy to help with that, if needed.

Ok((address, root_ca))
})?;

// TODO: this is from open_async. TODO: so much deduplication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this deduplication happen now, or is there something blocking it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging that - I've centralised the logic and removed the duplication. You're welcome to look again of course but I think it's pretty much a formal reorganisation which doesn't invalidate your approval... famous last words though!

@itowlson
Copy link
Collaborator Author

itowlson commented Mar 9, 2026

I think adding some test coverage here is do-able per my earlier review comment. Happy to help with that, if needed.

My memory of previous Postgres testing was that we had to do it in conformance-tests, which as far as I can tell can't land until the WIT does. Our previous attempts at including PostgreSQL in the in-repo Spin CI have run up against:

then make sure you have Postgres running locally

They always ended up horribly flaky, I think because of difficulties synchronising with server startup (although even mighty delays were never reliable, so maybe something else was going on). The container-based stuff in conformance-tests seemed to get around that, but conformance-tests being a separate repo with a dep makes it hard to update as part of the feature PR. I'm definitely open to discussions/advice, but I'd prefer not to block this PR on cracking this nut. (But would be keen to crack said nut for the long term!)

Signed-off-by: itowlson <ivan.towlson@fermyon.com>
@itowlson itowlson force-pushed the pg-async-streaming branch from bb66447 to 3f9a0d7 Compare March 9, 2026 21:30
@dicej
Copy link
Contributor

dicej commented Mar 9, 2026

I'm definitely open to discussions/advice, but I'd prefer not to block this PR on cracking this nut. (But would be keen to crack said nut for the long term!)

Sounds fine to me. FWIW, I was able to run the runtime_tests (including the outbound-postgres one) manually against a local Postgres server as a smoke test without touching the conformance-tests repo, but you're probably right that we'll need to update that repo to make any new test run in CI.

@itowlson itowlson enabled auto-merge March 9, 2026 22:03
@itowlson itowlson merged commit c0190b9 into spinframework:main Mar 9, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants