diff --git a/applications/flatware/c-flatware.c b/applications/flatware/c-flatware.c index 4e427019..40fccd8b 100644 --- a/applications/flatware/c-flatware.c +++ b/applications/flatware/c-flatware.c @@ -1,4 +1,5 @@ #include "c-flatware.h" +#include "api.h" #include "filesys.h" #include "flatware-decs.h" @@ -19,7 +20,7 @@ typedef char __attribute__( ( address_space( 10 ) ) ) * externref; typedef struct filedesc { - int32_t offset; + unsigned long offset; int32_t size; __wasi_fdstat_t stat; int32_t file_id; @@ -29,6 +30,10 @@ typedef struct filedesc typedef struct file { + bool in_use; + bool is_dir; + char padding[2]; + // if is_dir, -1, else the ro_mem holding the content int32_t mem_id; int32_t num_fds; struct substring name; @@ -78,10 +83,11 @@ static filedesc fds[N_FDS] = { .fs_flags = 0 } }, // WORKING DIRECTORY }; -static file files[N_FILES] = { { .name = { "stdin", 5 }, .mem_id = StdInROMem, .num_fds = 1 }, - { .name = { "stdout", 6 }, .mem_id = StdOutRWMem, .num_fds = 1 }, - { .name = { "stderr", 6 }, .mem_id = StdErrRWMem, .num_fds = 1 }, - { .name = { ".", 1 }, .mem_id = FileSystemBaseROTable, .num_fds = 1 } }; +static file files[N_FILES] + = { { .in_use = true, .is_dir = false, .name = { "stdin", 5 }, .mem_id = StdInROMem, .num_fds = 1 }, + { .in_use = true, .is_dir = false, .name = { "stdout", 6 }, .mem_id = StdOutRWMem, .num_fds = 1 }, + { .in_use = true, .is_dir = false, .name = { "stderr", 6 }, .mem_id = StdErrRWMem, .num_fds = 1 }, + { .in_use = true, .is_dir = true, .name = { ".", 1 }, .mem_id = -1, .num_fds = 1 } }; static unsigned int* ro_mem_use; @@ -90,7 +96,7 @@ static int64_t random_seed; // Bitset functions static bool bitset_get( unsigned int*, uint32_t ); static void bitset_set( unsigned int*, uint32_t ); -// static void bitset_clear( unsigned int*, uint32_t ); +static void bitset_clear( unsigned int*, uint32_t ); // for each value, first pass the width (T32 or T64), then pass the value // to specify the end, pass TEND @@ -248,6 +254,8 @@ _Noreturn void proc_exit( int32_t rval ) */ int32_t fd_close( int32_t fd ) { + file* file; + FUNC_TRACE( T32, fd, TEND ); if ( fd >= N_FDS ) { @@ -258,9 +266,17 @@ int32_t fd_close( int32_t fd ) return __WASI_ERRNO_BADF; } - files[fds[fd].file_id].num_fds--; + file = &files[fds[fd].file_id]; + file->num_fds--; fds[fd].open = false; + if ( file->num_fds == 0 ) { + file->in_use = false; + if ( !file->is_dir ) { + bitset_clear( ro_mem_use, (uint32_t)file->mem_id ); + } + } + return __WASI_ERRNO_SUCCESS; } @@ -322,10 +338,10 @@ int32_t fd_seek( int32_t fd, int64_t offset, int32_t whence, int32_t retptr0 ) return __WASI_ERRNO_INVAL; } - fds[fd].offset = (int32_t)offset; + fds[fd].offset = (unsigned long)offset; - flatware_mem_to_program_mem( retptr0, (int32_t)&fds[fd].offset, sizeof( fds[fd].offset ) ); - RET_TRACE( fds[fd].offset ); + flatware_mem_to_program_mem( retptr0, (int32_t)&offset, sizeof( offset ) ); + RET_TRACE( offset ); return __WASI_ERRNO_SUCCESS; } @@ -357,9 +373,14 @@ int32_t fd_read( int32_t fd, int32_t iovs, int32_t iovs_len, int32_t retptr0 ) f = files[fds[fd].file_id]; + if ( f.is_dir ) { + return __WASI_ERRNO_BADF; + } + // Iterate over buffers for ( int32_t i = 0; i < iovs_len; ++i ) { - int32_t file_remaining = fds[fd].size - fds[fd].offset; + int32_t file_remaining = fds[fd].size - (int32_t)fds[fd].offset; + if ( file_remaining == 0 ) { break; } @@ -370,8 +391,9 @@ int32_t fd_read( int32_t fd, int32_t iovs, int32_t iovs_len, int32_t retptr0 ) + (int32_t)offsetof( __wasi_iovec_t, buf_len ) ); size_to_read = iobuf_len < file_remaining ? iobuf_len : file_remaining; - ro_mem_to_program_mem( f.mem_id, iobuf_offset, fds[fd].offset, size_to_read ); - fds[fd].offset += size_to_read; + + ro_mem_to_program_mem( f.mem_id, iobuf_offset, (int32_t)fds[fd].offset, size_to_read ); + fds[fd].offset += (unsigned long)size_to_read; total_read += size_to_read; } @@ -407,20 +429,22 @@ int32_t fd_write( int32_t fd, int32_t iovs, int32_t iovs_len, int32_t retptr0 ) f = files[fds[fd].file_id]; + if ( f.is_dir ) { + return __WASI_ERRNO_BADF; + } + for ( int32_t i = 0; i < iovs_len; i++ ) { iobuf_offset = get_i32_program( iovs + i * (int32_t)sizeof( __wasi_iovec_t ) + (int32_t)offsetof( __wasi_iovec_t, buf ) ); iobuf_len = get_i32_program( iovs + i * (int32_t)sizeof( __wasi_iovec_t ) + (int32_t)offsetof( __wasi_iovec_t, buf_len ) ); - while ( fds[fd].offset + iobuf_len > page_size_rw_mem( f.mem_id ) * WASM_RT_PAGESIZE ) { + while ( (int32_t)( fds[fd].offset + (unsigned long)iobuf_len ) + > page_size_rw_mem( f.mem_id ) * WASM_RT_PAGESIZE ) { grow_rw_mem_pages( f.mem_id, iobuf_len / WASM_RT_PAGESIZE + 1 ); } - // if ( fd == STDOUT || fd == STDERR ) { - // program_mem_unsafe_io( (const char*)iobuf_offset, iobuf_len ); - // } - program_mem_to_rw_mem( f.mem_id, fds[fd].offset, iobuf_offset, iobuf_len ); - fds[fd].offset += iobuf_len; + program_mem_to_rw_mem( f.mem_id, (int32_t)fds[fd].offset, iobuf_offset, iobuf_len ); + fds[fd].offset += (unsigned long)iobuf_len; total_written += iobuf_len; } @@ -568,7 +592,7 @@ int32_t fd_allocate( int32_t fd, int64_t offset, int64_t len ) /** * @brief Synchronizes the data of a file descriptor. - * + * * @param fd File descriptor * @return int32_t Status code */ @@ -589,6 +613,8 @@ int32_t fd_datasync( int32_t fd ) */ int32_t fd_filestat_get( int32_t fd, int32_t retptr0 ) { + __wasi_filestat_t stat; + FUNC_TRACE( T32, fd, T32, retptr0, TEND ); if ( fd >= N_FDS ) { @@ -599,6 +625,27 @@ int32_t fd_filestat_get( int32_t fd, int32_t retptr0 ) return __WASI_ERRNO_BADF; } + if ( fds[fd].stat.fs_filetype == __WASI_FILETYPE_REGULAR_FILE ) { + stat.dev = 1; + stat.nlink = 1; + stat.size = (unsigned long)fds[fd].size; + stat.atim = 1757913961613440734; + stat.mtim = 1757913961613440734; + stat.ctim = 1757913961613440734; + stat.filetype = __WASI_FILETYPE_REGULAR_FILE; + flatware_mem_to_program_mem( retptr0, (int32_t)&stat, sizeof( stat ) ); + return 0; + } else if ( fds[fd].stat.fs_filetype == __WASI_FILETYPE_DIRECTORY ) { + stat.dev = 1; + stat.size = 4096; + stat.filetype = __WASI_FILETYPE_DIRECTORY; + stat.atim = 1757913961613440734; + stat.mtim = 1757913961613440734; + stat.ctim = 1757913961613440734; + flatware_mem_to_program_mem( retptr0, (int32_t)&stat, sizeof( stat ) ); + return 0; + } + return __WASI_ERRNO_NOTSUP; } @@ -623,7 +670,8 @@ int32_t fd_filestat_set_size( int32_t fd, int64_t size ) if ( fds[fd].stat.fs_rights_base & __WASI_RIGHTS_FD_FILESTAT_SET_SIZE ) { file f = files[fds[fd].file_id]; - int32_t grow_pages = (int32_t)( ( size - page_size_rw_mem( f.mem_id ) * WASM_RT_PAGESIZE ) / WASM_RT_PAGESIZE + 1 ); + int32_t grow_pages + = (int32_t)( ( size - page_size_rw_mem( f.mem_id ) * WASM_RT_PAGESIZE ) / WASM_RT_PAGESIZE + 1 ); fds[fd].size = (int32_t)size; f = files[fds[fd].file_id]; if ( grow_pages > 0 ) { @@ -637,12 +685,12 @@ int32_t fd_filestat_set_size( int32_t fd, int64_t size ) /** * @brief Updates the timestamp metadata of a file descriptor. - * + * * @param fd File descriptor * @param atim Accessed timestamp * @param mtim Modified timestamp * @param fst_flags Bitmask of __wasi_fstflags_t - * @return int32_t + * @return int32_t */ int32_t fd_filestat_set_times( int32_t fd, int64_t atim, int64_t mtim, int32_t fst_flags ) { @@ -680,6 +728,10 @@ int32_t fd_pread( int32_t fd, int32_t iovs, int32_t iovs_len, int64_t offset, in f = files[fds[fd].file_id]; + if ( f.is_dir ) { + return __WASI_ERRNO_BADF; + } + // Iterate over buffers for ( int32_t i = 0; i < iovs_len; ++i ) { int64_t file_remaining = fds[fd].size - offset; @@ -705,7 +757,7 @@ int32_t fd_pread( int32_t fd, int32_t iovs, int32_t iovs_len, int64_t offset, in /** * @brief Writes to a file descriptor at a given offset without changing the file descriptor's offset. - * + * * @param fd File descriptor * @param iovs Array of __wasi_ciovec_t structs containing buffers to write from * @param iovs_len Number of buffers in iovs @@ -722,7 +774,7 @@ int32_t fd_pwrite( int32_t fd, int32_t iovs, int32_t iovs_len, int64_t offset, i /** * @brief Reads directory entries. - * + * * @param fd File descriptor * @param buf Buffer to store directory entries * @param buf_len Buffer length @@ -732,14 +784,70 @@ int32_t fd_pwrite( int32_t fd, int32_t iovs, int32_t iovs_len, int64_t offset, i */ int32_t fd_readdir( int32_t fd, int32_t buf, int32_t buf_len, int64_t cookie, int32_t retptr0 ) { + struct substring path_str; + unsigned long offset = 0; + unsigned long buf_used = 0; + char null = '\0'; FUNC_TRACE( T32, fd, T32, buf, T32, buf_len, T64, cookie, T32, retptr0, TEND ); + path_str = files[fds[fd].file_id].name; + + if ( find_deep_entry( path_str, FileSystemBaseROTable, ScratchROTable ) == FILESYS_SEARCH_FAILED ) { + return __WASI_ERRNO_NOENT; + } + + if ( is_dir( ScratchROTable ) ) { + attach_tree_ro_table( ScratchROTable, get_ro_table( ScratchROTable, DIRENT_CONTENT ) ); + for ( int i = 0; i < size_ro_table( ScratchROTable ); i++ ) { + __wasi_dirent_t dirent; + struct substring name; + unsigned long offset_next; + + // attach flatware dirent + attach_tree_ro_table( ScratchFileROTable, get_ro_table( ScratchROTable, i ) ); + + name = get_name( ScratchFileROTable ); + + if ( is_dir( ScratchFileROTable ) ) { + dirent.d_type = __WASI_FILETYPE_DIRECTORY; + } else { + dirent.d_type = __WASI_FILETYPE_REGULAR_FILE; + } + + dirent.d_namlen = name.len; + + offset_next = offset + sizeof( dirent ) + name.len + 1; + dirent.d_next = offset_next; + + if ( offset >= cookie && offset_next < cookie + (unsigned long)buf_len ) { + // Copy the content to buf + flatware_mem_to_program_mem( + ( buf + (int32_t)offset - (int32_t)cookie ), (int32_t)&dirent, sizeof( dirent ) ); + flatware_mem_to_program_mem( ( buf + (int32_t)offset - (int32_t)cookie + (int32_t)sizeof( dirent ) ), + (int32_t)name.ptr, + (int32_t)name.len ); + flatware_mem_to_program_mem( + ( buf + (int32_t)offset - (int32_t)cookie + (int32_t)sizeof( dirent ) + (int32_t)name.len ), + (int32_t)&null, + 1 ); + + } else if ( offset_next >= cookie + (unsigned long)buf_len ) { + buf_used = offset - (unsigned long)cookie; + flatware_mem_to_program_mem( retptr0, (int32_t)&buf_used, sizeof( buf_used ) ); + return 0; + } + offset = offset_next; + } + } + + buf_used = offset - (unsigned long)cookie; + flatware_mem_to_program_mem( retptr0, (int32_t)&buf_used, sizeof( buf_used ) ); return 0; } /** * @brief Synchronizes file and metadata changes to storage. - * + * * @param fd File descriptor * @return int32_t Status code */ @@ -752,7 +860,7 @@ int32_t fd_sync( int32_t fd ) /** * @brief Gets the current offset of a file descriptor. - * + * * @param fd File descriptor * @param retptr0 Returns offset as int32_t * @return int32_t Status code @@ -792,7 +900,7 @@ int32_t path_create_directory( int32_t fd, int32_t path, int32_t path_len ) /** * @brief Gets file metadata - * + * * @param fd Base directory for path * @param flags Path flags as __wasi_lookupflags_t * @param path Path to file @@ -802,9 +910,40 @@ int32_t path_create_directory( int32_t fd, int32_t path, int32_t path_len ) */ int32_t path_filestat_get( int32_t fd, int32_t flags, int32_t path, int32_t path_len, int32_t retptr0 ) { + struct substring path_str; + __wasi_filestat_t stat; + FUNC_TRACE( T32, fd, T32, flags, T32, path, T32, path_len, T32, retptr0, TEND ); - return 0; + path_str.len = (size_t)path_len; + path_str.ptr = (char*)calloc( path_str.len + 1, sizeof( char ) ); + program_mem_to_flatware_mem( (int32_t)path_str.ptr, (int32_t)path, (int32_t)path_str.len ); + + if ( find_deep_entry( path_str, FileSystemBaseROTable, ScratchROTable ) == FILESYS_SEARCH_FAILED ) { + return __WASI_ERRNO_NOENT; + } + + if ( is_dir( ScratchROTable ) ) { + stat.dev = 1; + stat.size = 4096; + stat.atim = 1757913961613440734; + stat.mtim = 1757913961613440734; + stat.ctim = 1757913961613440734; + stat.filetype = __WASI_FILETYPE_DIRECTORY; + flatware_mem_to_program_mem( retptr0, (int32_t)&stat, sizeof( stat ) ); + return __WASI_ERRNO_SUCCESS; + } else { + attach_blob_ro_mem( ScratchROMem, get_ro_table( ScratchROTable, DIRENT_CONTENT ) ); + stat.dev = 1; + stat.nlink = 1; + stat.atim = 1757913961613440734; + stat.mtim = 1757913961613440734; + stat.ctim = 1757913961613440734; + stat.size = (unsigned long)byte_size_ro_mem( ScratchROMem ); + stat.filetype = __WASI_FILETYPE_REGULAR_FILE; + flatware_mem_to_program_mem( retptr0, (int32_t)&stat, sizeof( stat ) ); + return 0; + } } /** @@ -834,10 +973,10 @@ int32_t path_filestat_set_times( int32_t fd, /** * @brief Creates a hard link. - * + * * @param old_fd Base directory for source path * @param old_flags Source flags - * @param old_path Source path + * @param old_path Source path * @param old_path_len Source path length * @param new_fd Base directory for destination path * @param new_path Destination path @@ -872,7 +1011,7 @@ int32_t path_link( int32_t old_fd, /** * @brief Reads the target path of a symlink - * + * * @param fd Base directory for path * @param path Path * @param path_len Path length @@ -890,7 +1029,7 @@ int32_t path_readlink( int32_t fd, int32_t path, int32_t path_len, int32_t buf, /** * @brief Removes a directory. - * + * * @param fd Base directory for path * @param path Path to directory * @param path_len Path length @@ -905,7 +1044,7 @@ int32_t path_remove_directory( int32_t fd, int32_t path, int32_t path_len ) /** * @brief Renames a file. - * + * * @param fd Base directory for source path * @param old_path Source path * @param old_path_len Source path length @@ -928,7 +1067,7 @@ int32_t path_rename( int32_t fd, /** * @brief Creates a symbolic link. - * + * * @param old_path Source path * @param old_path_len Source path length * @param fd Base directory for paths @@ -945,7 +1084,7 @@ int32_t path_symlink( int32_t old_path, int32_t old_path_len, int32_t fd, int32_ /** * @brief Unlinks a file at the given path. - * + * * @param fd Base directory for path * @param path Path to file * @param path_len Length of path @@ -981,6 +1120,7 @@ int32_t args_sizes_get( int32_t num_argument_ptr, int32_t size_argument_ptr ) for ( int32_t i = 0; i < num; i++ ) { attach_blob_ro_mem( ScratchROMem, get_ro_table( ArgsROTable, i ) ); size += byte_size_ro_mem( ScratchROMem ); + size += 1; } flatware_mem_to_program_mem( size_argument_ptr, (int32_t)&size, 4 ); @@ -998,6 +1138,7 @@ int32_t args_sizes_get( int32_t num_argument_ptr, int32_t size_argument_ptr ) */ int32_t args_get( int32_t argv_ptr, int32_t argv_buf_ptr ) { + char null = '\0'; int32_t size = 0; int32_t addr = argv_buf_ptr; @@ -1007,7 +1148,8 @@ int32_t args_get( int32_t argv_ptr, int32_t argv_buf_ptr ) size = byte_size_ro_mem( ScratchROMem ); flatware_mem_to_program_mem( argv_ptr + i * (int32_t)sizeof( char* ), (int32_t)&addr, sizeof( char* ) ); ro_mem_to_program_mem( ScratchROMem, addr, 0, size ); - addr += size; + flatware_mem_to_program_mem( addr + size, (int32_t)&null, 1 ); + addr += size + 1; } return __WASI_ERRNO_SUCCESS; @@ -1043,8 +1185,8 @@ int32_t environ_sizes_get( int32_t retptr0, int32_t retptr1 ) } /** - * @brief Gets the environment variables. - * + * @brief Gets the environment variables. + * * @param environ Environment variable array pointer (char**) * @param environ_buf Environment variable buffer pointer (char*) * @return int32_t Status code @@ -1137,10 +1279,15 @@ int32_t path_open( int32_t fd, } } - if ( file_mem_id == -1 ) { + if ( file_id == -1 ) { + if ( find_deep_entry( path_str, FileSystemBaseROTable, ScratchROTable ) == FILESYS_SEARCH_FAILED ) { + return __WASI_ERRNO_NOENT; + } + for ( int32_t i = 0; i < N_FILES; ++i ) { - if ( files[i].num_fds == 0 ) { + if ( !files[i].in_use ) { file_id = i; + files[i].in_use = true; break; } } @@ -1149,37 +1296,51 @@ int32_t path_open( int32_t fd, return __WASI_ERRNO_NFILE; } - for ( int32_t i = File0ROMem; i < NUM_RO_MEM; ++i ) { - if ( !bitset_get( ro_mem_use, (uint32_t)i ) ) { - file_mem_id = i; - bitset_set( ro_mem_use, (uint32_t)i ); - break; - } - } - if ( find_deep_entry( path_str, FileSystemBaseROTable, ScratchROTable ) == FILESYS_SEARCH_FAILED ) { - return __WASI_ERRNO_NOENT; - } + files[file_id].num_fds = 0; + files[file_id].name = path_str; if ( is_dir( ScratchROTable ) ) { - return __WASI_ERRNO_NOENT; - } + files[file_id].is_dir = true; + files[file_id].mem_id = -1; + } else { + files[file_id].is_dir = false; + + for ( int32_t i = File0ROMem; i < NUM_RO_MEM; ++i ) { + if ( !bitset_get( ro_mem_use, (uint32_t)i ) ) { + file_mem_id = i; + bitset_set( ro_mem_use, (uint32_t)i ); + break; + } + } - attach_blob_ro_mem( file_mem_id, get_ro_table( ScratchROTable, DIRENT_CONTENT ) ); + if ( file_mem_id == -1 ) { + return __WASI_ERRNO_NFILE; + } - files[file_id].mem_id = file_mem_id; - files[file_id].name = path_str; + attach_blob_ro_mem( file_mem_id, get_ro_table( ScratchROTable, DIRENT_CONTENT ) ); + files[file_id].mem_id = file_mem_id; + } } + files[file_id].num_fds++; fds[result_fd].offset = 0; - fds[result_fd].size = byte_size_ro_mem( file_mem_id ); + fds[result_fd].size = files[file_id].is_dir ? 4096 : byte_size_ro_mem( file_mem_id ); fds[result_fd].open = true; fds[result_fd].file_id = file_id; - fds[result_fd].stat.fs_filetype = __WASI_FILETYPE_REGULAR_FILE; - fds[result_fd].stat.fs_flags = __WASI_FDFLAGS_APPEND; - fds[result_fd].stat.fs_rights_base = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_SEEK; - fds[result_fd].stat.fs_rights_inheriting = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_SEEK; + if ( files[file_id].is_dir ) { + fds[result_fd].stat.fs_filetype = __WASI_FILETYPE_DIRECTORY; + fds[result_fd].stat.fs_flags = 0; + fds[result_fd].stat.fs_rights_base = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_WRITE | __WASI_RIGHTS_FD_READDIR; + fds[result_fd].stat.fs_rights_inheriting + = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_WRITE | __WASI_RIGHTS_FD_READDIR; + } else { + fds[result_fd].stat.fs_filetype = __WASI_FILETYPE_REGULAR_FILE; + fds[result_fd].stat.fs_flags = __WASI_FDFLAGS_APPEND; + fds[result_fd].stat.fs_rights_base = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_SEEK; + fds[result_fd].stat.fs_rights_inheriting = __WASI_RIGHTS_FD_READ | __WASI_RIGHTS_FD_SEEK; + } flatware_mem_to_program_mem( retptr0, (int32_t)&result_fd, sizeof( result_fd ) ); RET_TRACE( result_fd ); @@ -1202,7 +1363,7 @@ int32_t clock_res_get( int32_t id, int32_t retptr0 ) /** * @brief Gets the time of a clock. - * + * * @param id Clock ID * @param precision Maximum permitted error in nanoseconds * @param retptr0 Returns time as int64_t @@ -1217,7 +1378,7 @@ int32_t clock_time_get( int32_t id, int64_t precision, int32_t retptr0 ) /** * @brief Concurrently polls for a set of events - * + * * @param in The events to subscribe to (array of __wasi_subscription_t) * @param out The events that have occurred (array of __wasi_event_t) * @param nsubscriptions Number of subscriptions @@ -1233,7 +1394,7 @@ int32_t poll_oneoff( int32_t in, int32_t out, int32_t nsubscriptions, int32_t re /** * @brief Yields the execution of the current process. - * + * * @return int32_t Status code */ int32_t sched_yield( void ) @@ -1269,7 +1430,7 @@ int32_t random_get( int32_t buf, int32_t buf_len ) /** * @brief Accepts a new connection on a socket. - * + * * @param fd File descriptor * @param flags Flags * @param retptr0 New file descriptor @@ -1307,7 +1468,7 @@ int32_t sock_recv( int32_t fd, /** * @brief Sends a message on a socket. - * + * * @param fd File descriptor of socket * @param si_data Array of __wasi_ciovec_t structs containing buffers to send * @param si_data_len Number of buffers in si_data @@ -1337,8 +1498,8 @@ int32_t sock_shutdown( int32_t fd, int32_t how ) } /** - * @brief Gets the value of the bitset at the given index. - * + * @brief Gets the value of the bitset at the given index. + * * @param bitset Bitset * @param index Index * @return true Index is set @@ -1351,7 +1512,7 @@ static bool bitset_get( unsigned int* bitset, uint32_t index ) /** * @brief Sets the value of the bitset at the given index. - * + * * @param bitset Bitset * @param index Index */ @@ -1360,10 +1521,16 @@ static void bitset_set( unsigned int* bitset, uint32_t index ) bitset[index / sizeof( unsigned int )] |= 1 << ( index % sizeof( unsigned int ) ); } -// static void bitset_clear( unsigned int* bitset, uint32_t index ) -// { -// bitset[index / sizeof( unsigned int )] &= ~( 1 << ( index % sizeof( unsigned int ) ) ); -// } +/** + * @brief Clears the value of the bitset at the given index. + * + * @param bitset Bitset + * @param index Index + */ +static void bitset_clear( unsigned int* bitset, uint32_t index ) +{ + bitset[index / sizeof( unsigned int )] &= ~( 1 << ( index % sizeof( unsigned int ) ) ); +} externref fixpoint_apply( externref encode ) { @@ -1388,8 +1555,8 @@ externref fixpoint_apply( externref encode ) set_rw_table( OutputRWTable, OUTPUT_FILESYSTEM, create_tree_rw_table( FileSystemRWTable, size_rw_table( FileSystemRWTable ) ) ); - set_rw_table( OutputRWTable, OUTPUT_STDOUT, create_blob_rw_mem( StdOutRWMem, fds[STDOUT].offset ) ); - set_rw_table( OutputRWTable, OUTPUT_STDERR, create_blob_rw_mem( StdErrRWMem, fds[STDERR].offset ) ); + set_rw_table( OutputRWTable, OUTPUT_STDOUT, create_blob_rw_mem( StdOutRWMem, (int32_t)fds[STDOUT].offset ) ); + set_rw_table( OutputRWTable, OUTPUT_STDERR, create_blob_rw_mem( StdErrRWMem, (int32_t)fds[STDERR].offset ) ); set_rw_table( OutputRWTable, OUTPUT_TRACE, create_blob_rw_mem( StdTraceRWMem, trace_offset ) ); free( ro_mem_use ); diff --git a/applications/flatware/examples/open/open-deep.c b/applications/flatware/examples/open/open-deep.c index 7ea10226..895e4da7 100644 --- a/applications/flatware/examples/open/open-deep.c +++ b/applications/flatware/examples/open/open-deep.c @@ -41,7 +41,7 @@ int main() printf( "fd_3 = %d\n", fd_3 ); - if ( fd_3 >= 0 ) { + if ( fd_3 < 0 ) { return -1; } diff --git a/applications/flatware/examples/python/CMakeLists.txt b/applications/flatware/examples/python/CMakeLists.txt index f50af459..727ffb89 100644 --- a/applications/flatware/examples/python/CMakeLists.txt +++ b/applications/flatware/examples/python/CMakeLists.txt @@ -1,13 +1,19 @@ file( DOWNLOAD - https://github.com/vmware-labs/webassembly-language-runtimes/releases/download/python%2F3.12.0%2B20231211-040d5a6/python-3.12.0.wasm - ${CMAKE_CURRENT_BINARY_DIR}/python.wasm + https://github.com/vmware-labs/webassembly-language-runtimes/releases/download/python%2F3.12.0%2B20231211-040d5a6/python-3.12.0-wasi-sdk-20.0.tar.gz + ${CMAKE_CURRENT_BINARY_DIR}/python.tar.gz SHOW_PROGRESS ) +file( + ARCHIVE_EXTRACT + INPUT ${CMAKE_CURRENT_BINARY_DIR}/python.tar.gz + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/python-workingdir +) + add_custom_command( OUTPUT "python-fixpoint.wasm" - DEPENDS python.wasm + DEPENDS python-workingdir/bin/python-3.12.0.wasm ../../c-flatware.c ../../filesys.c ../../asm-flatware.wat @@ -16,7 +22,7 @@ add_custom_command( COMMAND $ENV{HOME}/wasm-toolchain/wasm-tools/build/src/module-combiner/wasmlink --enable-multi-memory --enable-exceptions - python.wasm + python-workingdir/bin/python-3.12.0.wasm ../../wasi_snapshot_preview1.wasm -m wasi_command -n wasi_snapshot_preview1 diff --git a/etc/tests.cmake b/etc/tests.cmake index a3e0fe89..5ea2d210 100644 --- a/etc/tests.cmake +++ b/etc/tests.cmake @@ -14,7 +14,7 @@ add_custom_target (all-local-fixpoint-check COMMAND ${CMAKE_CTEST_COMMAND} --out COMMENT "Testing Fix..." ) -add_custom_target (flatware-check COMMAND ${CMAKE_CTEST_COMMAND} --output-on-failure -R "^f_" -E "f_python_flatware" -E "f_build_python" +add_custom_target (flatware-check COMMAND ${CMAKE_CTEST_COMMAND} --output-on-failure -R "^f_" -E "f_python_flatware" COMMENT "Testing Flatware..." ) add_custom_target (all-flatware-check COMMAND ${CMAKE_CTEST_COMMAND} --output-on-failure -R "^f_" COMMENT "Testing Flatware...") @@ -59,7 +59,7 @@ add_test(NAME f_return_flatware WORKING_DIRECTORY COMMAND ${CMAKE_CURRENT_BINARY add_test(NAME f_helloworld_flatware WORKING_DIRECTORY COMMAND ${CMAKE_CURRENT_BINARY_DIR}/src/tests/test-helloworld-flatware) add_test(NAME f_open_flatware WORKING_DIRECTORY COMMAND ${CMAKE_CURRENT_BINARY_DIR}/src/tests/test-open-flatware) -add_test(NAME f_build_python WORKING_DIRECTORY COMMAND ${CMAKE_COMMAND} --build ${CMAKE_BINARY_DIR}/applications-prefix/src/applications-build/ --target python_fixpoint --parallel) +add_test(NAME s_build_python WORKING_DIRECTORY COMMAND ${CMAKE_COMMAND} --build ${CMAKE_BINARY_DIR}/applications-prefix/src/applications-build/ --target python_fixpoint --parallel) add_test(NAME f_python_flatware WORKING_DIRECTORY COMMAND ${CMAKE_CURRENT_BINARY_DIR}/src/tests/test-python-flatware) -set_tests_properties(f_build_python PROPERTIES FIXTURES_SETUP python_fixture) +set_tests_properties(s_build_python PROPERTIES FIXTURES_SETUP python_fixture) set_tests_properties(f_python_flatware PROPERTIES FIXTURES_REQUIRED python_fixture) diff --git a/src/handle/blob.hh b/src/handle/blob.hh index 30843c16..ff192044 100644 --- a/src/handle/blob.hh +++ b/src/handle/blob.hh @@ -117,6 +117,10 @@ public: content[30] |= ( 1 << 7 ); } + Handle() + : Handle( 0, 64 ) + {} + inline u8x32 hash() const { u64x4 hash = (u64x4)content; diff --git a/src/runtime/dependency_graph.hh b/src/runtime/dependency_graph.hh index 94fe8691..f1856ed7 100644 --- a/src/runtime/dependency_graph.hh +++ b/src/runtime/dependency_graph.hh @@ -73,8 +73,7 @@ public: if ( backward_dependencies_.contains( task_or_object ) ) { for ( const auto dependent : backward_dependencies_[task_or_object] ) { auto& target = forward_dependencies_[dependent]; - target.erase( task_or_object ); - if ( target.empty() ) { + if ( target.erase( task_or_object ) && target.empty() ) { VLOG( 2 ) << "resuming " << dependent; unblocked.insert( dependent ); forward_dependencies_.erase( dependent ); diff --git a/src/runtime/executor.cc b/src/runtime/executor.cc index ce6bc217..a7c10630 100644 --- a/src/runtime/executor.cc +++ b/src/runtime/executor.cc @@ -2,10 +2,12 @@ #include #include #include +#include #include #include "executor.hh" #include "fixpointapi.hh" +#include "handle.hh" #include "overload.hh" #include "resource_limits.hh" #include "storage_exception.hh" @@ -21,6 +23,7 @@ Executor::Executor( Relater& parent, size_t threads, optional : make_shared( parent.labeled( "compile-elf" ), parent.labeled( "compile-fixed-point" ) ) ) { + fixpoint::storage = &parent_.storage_; for ( size_t i = 0; i < threads; i++ ) { threads_.emplace_back( [&]() { fixpoint::storage = &parent_.storage_; @@ -91,7 +94,39 @@ Result Executor::apply( Handle combination ) VLOG( 2 ) << "Apply " << combination; TreeData tree = parent_.storage_.get( combination ); + auto rlimits = tree->at( 0 ); + + VLOG( 2 ) << combination << " rlimits are " << rlimits; + auto limits = rlimits.unwrap() + .unwrap() + .try_into() + .and_then( [&]( auto x ) { return x.template try_into(); } ) + .transform( [&]( auto x ) { return fixpoint::storage->get( x ); } ); + + auto requested + = limits + .and_then( [&]( auto x ) { + return handle::extract( + x->at( 0 ).template unwrap().template unwrap().template unwrap() ); + } ) + .transform( [&]( auto x ) { return uint64_t( x ); } ) + .value_or( 0 ); + + { + auto w = parent_.available_memory_.write(); + Handle apply + = Handle( Handle( Handle( Handle( combination ) ) ) ); + VLOG( 2 ) << "Occupying " << apply << " " << w.get() << " " << requested; + if ( w.get() < requested ) { + VLOG( 1 ) << "Out of memory " << w.get() << " " << requested; + return {}; + } + + w.get() -= requested; + } + auto result = runner_->apply( combination, tree ); + parent_.available_memory_.write().get() += requested; return result; } @@ -117,6 +152,17 @@ std::optional> Executor::get( Handle name ) return {}; } +void Executor::retry( Handle name ) +{ + if ( threads_.size() == 0 ) { + throw HandleNotFound( name ); + } + + auto graph = parent_.graph_.write(); + graph->start( name ); + todo_.push( name ); +} + std::optional> Executor::get_handle( Handle ) { return {}; diff --git a/src/runtime/executor.hh b/src/runtime/executor.hh index ac33926b..3e61a0db 100644 --- a/src/runtime/executor.hh +++ b/src/runtime/executor.hh @@ -42,6 +42,7 @@ public: virtual std::optional get( Handle name ) override; virtual std::optional get( Handle name ) override; virtual std::optional> get( Handle name ) override; + void retry( Handle name ); virtual std::optional> get_handle( Handle name ) override; virtual std::optional get_shallow( Handle name ) override; virtual void put( Handle name, BlobData data ) override; diff --git a/src/runtime/network.cc b/src/runtime/network.cc index f861e2ca..e0c97260 100644 --- a/src/runtime/network.cc +++ b/src/runtime/network.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -6,7 +7,6 @@ #include #include #include -#include #include #include @@ -14,11 +14,8 @@ #include "eventloop.hh" #include "handle.hh" -#include "handle_post.hh" -#include "handle_util.hh" #include "message.hh" #include "network.hh" -#include "object.hh" #include "types.hh" using namespace std; @@ -347,9 +344,7 @@ std::optional Remote::get_info() return info_; } -Remote::Remote( EventLoop& events, - EventCategories categories, - TCPSocket socket, +Remote::Remote( TCPSocket socket, size_t index, MessageQueue& msg_q, optional> parent ) @@ -357,6 +352,15 @@ Remote::Remote( EventLoop& events, , msg_q_( msg_q ) , parent_( parent ) , index_( index ) +{} + +Remote::Remote( EventLoop& events, + EventCategories categories, + TCPSocket socket, + size_t index, + MessageQueue& msg_q, + optional> parent ) + : Remote( move( socket ), index, msg_q, parent ) { install_rule( events.add_rule( categories.rx_read_data, @@ -394,6 +398,63 @@ Remote::Remote( EventLoop& events, push_message( { Opcode::REQUESTINFO, string( "" ) } ); } +DataServer::DataServer( EventLoop& events, + EventCategories categories, + TCPSocket socket, + size_t index, + MessageQueue& msg_q, + optional> parent ) + : Remote( move( socket ), index, msg_q, parent ) +{ + install_rule( events.add_rule( + categories.rx_read_data, + socket_, + Direction::In, + [&] { rx_data_.push_from_fd( socket_ ); }, + [&] { return rx_data_.can_write(); }, + [&] { this->clean_up(); } ) ); + + install_rule( events.add_rule( + categories.tx_write_data, + socket_, + Direction::Out, + [&] { tx_data_.pop_to_fd( socket_ ); }, + [&] { return tx_data_.can_read(); }, + [&] { this->clean_up(); } ) ); + + install_rule( + events.add_rule( categories.rx_parse_msg, [&] { read_from_rb(); }, [&] { return rx_data_.can_read(); } ) ); + + install_rule( events.add_rule( + categories.tx_serialize_msg, + [&] { write_to_rb(); }, + [&] { return not tx_messages_.empty() and tx_data_.can_write(); } ) ); + + install_rule( events.add_rule( + categories.rx_process_msg, + [&] { + IncomingMessage message = move( rx_messages_.front() ); + rx_messages_.pop(); + process_incoming_message( move( message ) ); + }, + [&] { return not rx_messages_.empty(); } ) ); + + install_rule( events.add_rule( + categories.data_server_ready, + [&] { + auto res = ready_.pop(); + if ( res.has_value() ) { + auto blob = parent_.value().get().get( res.value() ); + if ( blob.has_value() ) { + send_blob( blob.value() ); + } + } + }, + [&] { return ready_.size_approx() > 0; } ) ); + + // push_message( { Opcode::REQUESTINFO, string( "" ) } ); +} + void Remote::process_incoming_message( IncomingMessage&& msg ) { if ( !parent_.has_value() ) @@ -407,6 +468,7 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) case Opcode::RUN: { auto payload = parse( std::get( msg.payload() ) ); auto task = payload.task; + VLOG( 1 ) << "RUN " << payload.task; { unique_lock lock( mutex_ ); reply_to_.insert( task ); @@ -436,11 +498,6 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) case Opcode::INFO: { auto payload = parse( std::get( msg.payload() ) ); - { - unique_lock lock( mutex_ ); - info_ = { .parallelism = payload.parallelism, .link_speed = payload.link_speed }; - info_cv_.notify_all(); - } for ( auto handle : payload.data ) { handle.visit( overload { @@ -460,6 +517,12 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) } ); } + { + unique_lock lock( mutex_ ); + info_ = { .parallelism = payload.parallelism, .link_speed = payload.link_speed }; + info_cv_.notify_all(); + } + break; } @@ -505,6 +568,7 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) case Opcode::LOADBLOB: { auto payload = parse( std::get( msg.payload() ) ); + VLOG( 1 ) << "LOADBLOB " << payload.handle.unwrap(); if ( parent.contains( payload.handle.unwrap() ) ) { parent.get( payload.handle.unwrap() ); } @@ -582,35 +646,14 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) for ( const auto& h : handles ) { VLOG( 2 ) << "Sending " << handle::fix( h ); std::visit( overload { - [&]( Handle n ) { - if ( !contains( n ) ) { - push_message( { Opcode::BLOBDATA, parent.get( n ).value() } ); - add_to_view( n ); - } - }, - [&]( Handle t ) { - if ( !contains( t ) ) { - push_message( { Opcode::TREEDATA, parent.get( t ).value() } ); - add_to_view( t ); - } - }, + [&]( Handle n ) { push_message( { Opcode::BLOBDATA, parent.get( n ).value() } ); }, + [&]( Handle t ) { push_message( { Opcode::TREEDATA, parent.get( t ).value() } ); }, []( Handle ) {}, []( Handle ) {}, }, h.get() ); } - // Any objects in this proposal are considered "exising" on the remote side - for ( const auto& [h, _] : *proposed_proposals_.front().second ) { - std::visit( overload { - [&]( Handle h ) { add_to_view( h ); }, - [&]( Handle t ) { add_to_view( t ); }, - []( Handle ) {}, - []( Handle ) {}, - }, - h.get() ); - } - proposed_proposals_.pop(); if ( result ) { @@ -648,6 +691,88 @@ void Remote::process_incoming_message( IncomingMessage&& msg ) return; } +void DataServer::run_after( function fn ) +{ + if ( DataServer::latency == 0 ) { + fn(); + } else { + threads_.push_back( thread( [fn]() { + this_thread::sleep_for( chrono::microseconds( latency ) ); + fn(); + } ) ); + } +} + +void DataServer::process_incoming_message( IncomingMessage&& msg ) +{ + if ( !parent_.has_value() ) + return; + + auto& parent = parent_.value().get(); + + VLOG( 1 ) << "process_incoming_message " << Message::OPCODE_NAMES[static_cast( msg.opcode() )]; + + switch ( msg.opcode() ) { + case Opcode::ACCEPT_TRANSFER: + case Opcode::PROPOSE_TRANSFER: + case Opcode::SHALLOWTREEDATA: + case Opcode::BLOBDATA: + case Opcode::TREEDATA: + case Opcode::INFO: + case Opcode::RESULT: + case Opcode::RUN: { + break; + } + + case Opcode::REQUESTINFO: { + auto parent_info = parent.get_info().value_or( IRuntime::Info { .parallelism = 0, .link_speed = 0 } ); + InfoPayload payload { + .parallelism = parent_info.parallelism, .link_speed = parent_info.link_speed, .data = parent.data() }; + push_message( OutgoingMessage::to_message( move( payload ) ) ); + break; + } + + case Opcode::REQUESTTREE: { + break; + } + + case Opcode::REQUESTBLOB: { + auto payload = parse( std::get( msg.payload() ) ); + auto h = payload.handle; + VLOG( 1 ) << "REQUESTBLOB " << h; + auto fn = [&, h]() { ready_.push( h ); }; + + run_after( fn ); + break; + } + + case Opcode::REQUESTSHALLOWTREE: { + break; + } + + case Opcode::LOADBLOB: { + auto payload = parse( std::get( msg.payload() ) ); + if ( parent.contains( payload.handle.unwrap() ) ) { + parent.get( payload.handle.unwrap() ); + } + break; + } + + case Opcode::LOADTREE: { + auto payload = parse( std::get( msg.payload() ) ); + if ( parent.contains( payload.handle ) ) { + parent.get( payload.handle ); + } + break; + } + + default: + throw runtime_error( "Invalid message Opcode" ); + } + + return; +} + void Remote::clean_up() { // Reset info @@ -673,7 +798,15 @@ Remote::~Remote() } } -void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload&& payload ) +DataServer::~DataServer() +{ + for ( auto& t : threads_ ) { + t.join(); + } +} + +template +void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload&& payload ) { if ( !connections_.read()->contains( remote_idx ) ) { if ( holds_alternative( payload ) ) { @@ -692,6 +825,7 @@ void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload& VLOG( 2 ) << "Adding " << b.first << " to proposal " << remote_idx; connection.incomplete_proposal_->push_back( { b.first, b.second } ); connection.proposal_size_ += b.second->size(); + connection.add_to_view( b.first ); } }, [&]( TreeDataPayload t ) { @@ -700,6 +834,7 @@ void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload& connection.incomplete_proposal_->push_back( { visit( []( auto h ) -> Handle { return h; }, t.first.get() ), t.second } ); connection.proposal_size_ += t.second->size() * sizeof( Handle ); + connection.add_to_view( t.first ); } }, [&]( RunPayload r ) { @@ -792,14 +927,14 @@ void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload& connection.proposal_size_ = 0; } }, - [&]( LoadBlobPayload&& payload ) { + [&]( LoadBlobPayload payload ) { auto named = payload.handle.unwrap(); if ( connection.contains( named ) && !connection.loaded( named ) ) { connection.add_to_view( named ); connection.push_message( OutgoingMessage::to_message( move( payload ) ) ); } }, - [&]( LoadTreePayload&& payload ) { + [&]( LoadTreePayload payload ) { if ( connection.contains( payload.handle ) && !connection.loaded( payload.handle ) ) { connection.add_to_view( payload.handle ); connection.push_message( OutgoingMessage::to_message( move( payload ) ) ); @@ -810,7 +945,8 @@ void NetworkWorker::process_outgoing_message( size_t remote_idx, MessagePayload& } } -void NetworkWorker::run_loop() +template +void NetworkWorker::run_loop() { categories_ = { .server_new_socket = events_.add_category( "server - new socket" ), @@ -822,6 +958,7 @@ void NetworkWorker::run_loop() .tx_serialize_msg = events_.add_category( "tx - serialize message" ), .tx_write_data = events_.add_category( "tx - write" ), .forward_msg = events_.add_category( "networkworker - forward msg to remote" ), + .data_server_ready = events_.add_category( "networkworker - forward msg to remote" ), }; // When we have a new server socket, add it to the event loop events_.add_rule( @@ -835,7 +972,7 @@ void NetworkWorker::run_loop() events_.add_rule( categories_.server_new_connection, server_socket, Direction::In, [&] { connections_.write()->emplace( next_connection_id_, - make_unique( + make_unique( events_, categories_, server_socket.accept(), next_connection_id_, msg_q_, parent_ ) ); { @@ -862,7 +999,7 @@ void NetworkWorker::run_loop() TCPSocket client_socket = *connecting_sockets_.pop(); connections_.write()->emplace( next_connection_id_, - make_unique( + make_unique( events_, categories_, std::move( client_socket ), next_connection_id_, msg_q_, parent_ ) ); addresses_.write()->emplace( connections_.read()->at( next_connection_id_ )->peer_address().to_string(), diff --git a/src/runtime/network.hh b/src/runtime/network.hh index 54b7f821..d785796b 100644 --- a/src/runtime/network.hh +++ b/src/runtime/network.hh @@ -36,15 +36,22 @@ struct EventCategories size_t tx_serialize_msg; size_t tx_write_data; size_t forward_msg; + size_t data_server_ready; }; +template class NetworkWorker; +class Remote; +class DataServer; + class Remote : public IRuntime { - friend class NetworkWorker; + friend class NetworkWorker; + friend class NetworkWorker; static constexpr size_t STORAGE_SIZE = 65536; +protected: TCPSocket socket_; MessageQueue& msg_q_; @@ -76,7 +83,7 @@ class Remote : public IRuntime std::queue, std::optional>>, std::unique_ptr>> proposed_proposals_ {}; - FixTable, AbslHash> blobs_view_ { 100000 }; + FixTable, AbslHash> blobs_view_ { 1000000 }; FixTable, AbslHash, handle::any_tree_equal> trees_view_ { 1000000 }; FixTable, AbslHash> relations_view_ { 100000 }; @@ -88,6 +95,11 @@ public: MessageQueue& msg_q, std::optional> parent ); + Remote( TCPSocket socket, + size_t index, + MessageQueue& msg_q, + std::optional> parent ); + std::optional get( Handle name ) override; std::optional get( Handle name ) override; std::optional get_shallow( Handle ) override; @@ -124,7 +136,7 @@ public: } ~Remote(); -private: +protected: void load_tx_message(); void write_to_rb(); void read_from_rb(); @@ -145,6 +157,30 @@ private: void add_to_view( Handle handle ); }; +class DataServer : public Remote +{ + friend class NetworkWorker; + +private: + Channel> ready_ {}; + std::list threads_ {}; + void process_incoming_message( IncomingMessage&& msg ); + void run_after( std::function ); + +public: + inline static size_t latency = 0; + + DataServer( EventLoop& events, + EventCategories categories, + TCPSocket socket, + size_t index, + MessageQueue& msg_q, + std::optional> parent ); + + ~DataServer(); +}; + +template class NetworkWorker { private: @@ -169,7 +205,7 @@ private: void process_outgoing_message( size_t remote_id, MessagePayload&& message ); public: - SharedMutex>> connections_ {}; + SharedMutex>> connections_ {}; NetworkWorker( std::optional> parent = {} ) : parent_( parent ) @@ -219,3 +255,6 @@ public: return std::static_pointer_cast( rt ); } }; + +template class NetworkWorker; +template class NetworkWorker; diff --git a/src/runtime/pass.cc b/src/runtime/pass.cc index 34ad93bf..89e975c1 100644 --- a/src/runtime/pass.cc +++ b/src/runtime/pass.cc @@ -5,6 +5,7 @@ #include "overload.hh" #include "relater.hh" #include "scheduler.hh" +#include #include #include #include @@ -76,6 +77,9 @@ BasePass::BasePass( reference_wrapper relater ) auto locked_remote = remote.lock(); if ( locked_remote ) { auto info = locked_remote->get_info(); + if ( info.has_value() ) { + remotes_.push_back( locked_remote ); + } if ( info.has_value() and info->parallelism > 0 ) { available_remotes_.push_back( locked_remote ); } @@ -278,7 +282,7 @@ size_t BasePass::absent_size( std::shared_ptr worker, Handle void BasePass::all( Handle job ) { { - if ( local_->get_info().has_value() and local_->get_info()->parallelism > 0 ) { + if ( local_->get_info().has_value() ) { job.visit( overload { [&]( Handle ref ) { if ( local_->contains_shallow( relater_.get().unref( ref ) ) ) { @@ -299,9 +303,9 @@ void BasePass::all( Handle job ) } } - for ( const auto& remote : available_remotes_ ) { + for ( const auto& remote : remotes_ ) { auto info = remote->get_info(); - if ( info.has_value() and info->parallelism > 0 ) { + if ( info.has_value() ) { job.visit( overload { [&]( Handle ref ) { if ( remote->contains_shallow( relater_.get().unref( ref ) ) ) { @@ -388,7 +392,9 @@ void MinAbsentMaxParallelism::relation_post( Handle job, if ( !chosen_remotes_.contains( d ) ) { const auto& contains = base_.get().get_contains( d ); for ( const auto& s : contains ) { - present[s] += base_.get().get_output_size( d ); + if ( s->get_info()->parallelism > 0 ) { + present[s] += base_.get().get_output_size( d ); + } } } else { present[chosen_remotes_.at( d ).first] += base_.get().get_output_size( d ); @@ -403,6 +409,7 @@ void MinAbsentMaxParallelism::relation_post( Handle job, optional max_present_size; optional local_present_size; size_t max_parallelism = 0; + for ( const auto& [r, s] : present ) { if ( is_local( r ) ) { local_present_size = s; @@ -429,6 +436,11 @@ void MinAbsentMaxParallelism::relation_post( Handle job, } } + if ( present.size() == 0 ) { + chosen_remote = local_; + local_present_size = 0; + } + VLOG( 2 ) << "MinAbsent::post " << job << " " << chosen_remote.value() << " " << is_local( chosen_remote.value() ); if ( is_local( chosen_remote.value() ) ) { @@ -469,9 +481,13 @@ void ChildBackProp::all( Handle job ) return; } + if ( chosen_remotes_.contains( job ) && is_local( chosen_remotes_.at( job ).first ) ) { + return; + } + auto curr_output_size = base_.get().get_output_size( job ); bool move_to_local = false; - bool cont = job.visit( overload { + bool not_contained = job.visit( overload { [&]( auto r ) { const auto& contains = base_.get().get_contains( r ); if ( !contains.empty() && contains.contains( local_ ) ) { @@ -481,7 +497,7 @@ void ChildBackProp::all( Handle job ) }, } ); - if ( cont ) { + if ( not_contained ) { if ( is_local( chosen_remotes_.at( job ).first ) ) { return; } @@ -525,16 +541,24 @@ void ChildBackProp::relation_post( Handle job, const absl::flat_hash_s }, } ); - VLOG( 2 ) << "ChildBackProp::relation_post " << undo; + VLOG( 2 ) << "ChildBackProp::relation_post " << job << " " << undo; if ( undo ) { if ( dependencies.size() == 1 ) { auto d = *dependencies.begin(); if ( chosen_remotes_.contains( d ) ) { - chosen_remotes_.insert_or_assign( job, { chosen_remotes_.at( d ).first, chosen_remotes_.at( d ).second } ); + if ( chosen_remotes_.at( d ).first->get_info()->parallelism > 0 ) { + chosen_remotes_.insert_or_assign( job, + { chosen_remotes_.at( d ).first, chosen_remotes_.at( d ).second } ); + } else { + chosen_remotes_.insert_or_assign( job, { local_, chosen_remotes_.at( d ).second } ); + } } else { if ( !base_.get().get_contains( d ).contains( local_ ) ) { - chosen_remotes_.insert_or_assign( job, { *base_.get().get_contains( d ).begin(), 0 } ); + auto rt = *base_.get().get_contains( d ).begin(); + if ( rt->get_info()->parallelism > 0 ) { + chosen_remotes_.insert_or_assign( job, { rt, 0 } ); + } } } } else { @@ -556,11 +580,21 @@ void ChildBackProp::relation_post( Handle job, const absl::flat_hash_s chosen_remotes_.insert_or_assign( job, { remote.value(), 0 } ); } } + } else { + for ( auto d : dependencies ) { + if ( chosen_remotes_.contains( d ) && is_local( chosen_remotes_.at( d ).first ) ) { + chosen_remotes_.at( job ).second += chosen_remotes_.at( d ).second; + } + } } } void InOutSource::relation_pre( Handle, const absl::flat_hash_set>& dependencies ) { + if ( base_.get().get_available_remotes().size() == 0 ) { + return; + } + bool any_ep = false; for ( auto d : dependencies ) { if ( base_.get().get_ep( d ) ) { @@ -571,12 +605,12 @@ void InOutSource::relation_pre( Handle, const absl::flat_hash_set, greater> scores; + multimap> scores; for ( auto d : dependencies ) { if ( chosen_remotes_.contains( d ) and is_local( chosen_remotes_.at( d ).first ) ) { assigned += 1; @@ -598,36 +632,28 @@ void InOutSource::relation_pre( Handle, const absl::flat_hash_set job ) -{ - // XXX - const auto& available_remotes = base_.get().get_available_remotes(); - size_t rand_idx = rand() % available_remotes.size(); - chosen_remotes_.insert_or_assign( job, { available_remotes[rand_idx], 0 } ); -} - void SendToRemotePass::all( Handle job ) { if ( !is_local( chosen_remotes_.at( job ).first ) ) { VLOG( 1 ) << "Run job remotely " << job << endl; - remote_jobs_[chosen_remotes_.at( job ).first].insert( job ); + remote_jobs_[chosen_remotes_.at( job ).first].insert( { chosen_remotes_.at( job ).second, job } ); } } @@ -638,11 +664,11 @@ void SendToRemotePass::relation_pre( Handle, const absl::flat_hash_set if ( contains.empty() ) { if ( !is_local( chosen_remotes_.at( d ).first ) ) { VLOG( 1 ) << "Run job remotely " << d << endl; - remote_jobs_[chosen_remotes_.at( d ).first].insert( d ); + remote_jobs_[chosen_remotes_.at( d ).first].insert( { chosen_remotes_.at( d ).second, d } ); } } else { if ( !contains.contains( local_ ) ) { - remote_jobs_[*contains.begin()].insert( d ); + remote_jobs_[*contains.begin()].insert( { numeric_limits::max(), d } ); } } } @@ -696,10 +722,12 @@ void SendToRemotePass::send_remote_jobs( Handle root ) { this->run( root ); - vector, absl::flat_hash_set>::const_iterator>> iterators {}; + vector< + pair, absl::btree_multimap, greater>::const_iterator>> + iterators {}; for ( const auto& [rt, handles] : remote_jobs_ ) { iterators.push_back( make_pair( rt, handles.begin() ) ); - for ( auto h : handles ) { + for ( auto [_, h] : handles ) { send_job_dependencies( rt, h ); } } @@ -726,7 +754,7 @@ void SendToRemotePass::send_remote_jobs( Handle root ) while ( finished.size() < iterators.size() ) { auto& [r, it] = iterators.at( iterator_index ); if ( it != remote_jobs_.at( r ).end() ) { - auto h = *it; + auto h = ( *it ).second; h.visit( overload { [&]( Handle d ) { r->get( d ); @@ -769,17 +797,8 @@ void FinalPass::relation_pre( Handle job, const absl::flat_hash_set> unblocked; sch_.get().merge_sketch_graph( job, unblocked ); - if ( unblocked.size() == 1 && !todo_.has_value() ) { - auto todo = *unblocked.begin(); - if ( is_local( chosen_remotes_.at( todo ).first ) ) { - todo_ = todo; - } - } else { - for ( auto r : unblocked ) { - if ( is_local( chosen_remotes_.at( r ).first ) ) { - chosen_remotes_.at( r ).first->get( r ); - } - } + for ( auto u : unblocked ) { + unblocked_.insert( { chosen_remotes_.at( u ).second, u } ); } } @@ -823,12 +842,6 @@ optional> PassRunner::run( reference_wrapper rt, selection.value()->run( top_level_job ); break; } - - case PassType::Random: { - selection = make_unique( base, rt ); - selection.value()->run( top_level_job ); - break; - } } } @@ -841,20 +854,29 @@ optional> PassRunner::run( reference_wrapper rt, FinalPass final( base, rt, sch, move( selection.value() ) ); final.run( top_level_job ); - if ( final.get_todo().has_value() ) { - auto r = final.get_todo().value(); - return r.visit>>( overload { [&]( Handle ) -> optional> { - rt.get().get_local()->get( r ); - return {}; - }, - [&]( Handle s ) -> optional> { - if ( top_level_job == Handle( r ) ) { - return s.unwrap(); - } else { - rt.get().get_local()->get( r ); - return {}; - } - } } ); + auto& unblocked = final.get_unblocked(); + if ( unblocked.size() == 1 ) { + auto r = ( *unblocked.begin() ).second; + if ( is_local( final.chosen_remotes_.at( r ).first ) ) { + return r.visit>>( overload { [&]( Handle ) -> optional> { + rt.get().get_local()->get( r ); + return {}; + }, + [&]( Handle s ) -> optional> { + if ( top_level_job == Handle( r ) ) { + return s.unwrap(); + } else { + rt.get().get_local()->get( r ); + return {}; + } + } } ); + } + } else { + for ( auto [_, u] : unblocked ) { + if ( is_local( final.chosen_remotes_.at( u ).first ) ) { + rt.get().get_local()->get( u ); + } + } } return {}; diff --git a/src/runtime/pass.hh b/src/runtime/pass.hh index b5405b78..f7236f72 100644 --- a/src/runtime/pass.hh +++ b/src/runtime/pass.hh @@ -3,6 +3,7 @@ #include "handle.hh" #include "interface.hh" #include "relater.hh" +#include #include #include #include @@ -59,6 +60,7 @@ private: size_t absent_size( std::shared_ptr worker, Handle job ); std::vector> available_remotes_ {}; + std::vector> remotes_ {}; public: BasePass( std::reference_wrapper relater ); @@ -99,6 +101,8 @@ public: { return std::move( chosen_remotes_ ); }; + + friend class PassRunner; }; // Only operates on local paths, except that `independent` are always invoked on root job @@ -163,7 +167,7 @@ class InOutSource : public PrunedSelectionPass virtual void relation_pre( Handle, const absl::flat_hash_set>& ) override; virtual void data( Handle ) override {} - virtual void all( Handle ) override {}; + virtual void all( Handle ) override {} virtual void relation_post( Handle, const absl::flat_hash_set>& ) override {} public: @@ -174,23 +178,11 @@ public: {} }; -class RandomSelection : public SelectionPass -{ - virtual void relation_pre( Handle, const absl::flat_hash_set>& ) override {} - virtual void data( Handle ) override {} - virtual void relation_post( Handle, const absl::flat_hash_set>& ) override {} - - virtual void all( Handle ) override; - -public: - RandomSelection( std::reference_wrapper base, std::reference_wrapper relater ) - : SelectionPass( base, relater ) - {} -}; - class SendToRemotePass : public PrunedSelectionPass { - std::unordered_map, absl::flat_hash_set>> remote_jobs_ {}; + std::unordered_map, + absl::btree_multimap, std::greater>> + remote_jobs_ {}; std::unordered_map, absl::flat_hash_set>> remote_data_ {}; void send_job_dependencies( std::shared_ptr, Handle ); @@ -212,8 +204,8 @@ public: class FinalPass : public PrunedSelectionPass { - std::optional> todo_ {}; std::reference_wrapper sch_; + absl::btree_multimap, std::greater> unblocked_ {}; virtual void data( Handle ) override; virtual void relation_pre( Handle, const absl::flat_hash_set>& ) override; @@ -229,8 +221,7 @@ public: : PrunedSelectionPass( base, relater, move( prev ) ) , sch_( sch ) {} - - std::optional> get_todo() { return todo_; }; + absl::btree_multimap, std::greater>& get_unblocked() { return unblocked_; }; }; // A correct sequence of passes contains: BasePass + (n >= 1) * SelectionPass + (n >= 0) * PrunedSelectionPass + @@ -242,8 +233,7 @@ public: { MinAbsentMaxParallelism, ChildBackProp, - InOutSource, - Random + InOutSource }; static std::optional> run( std::reference_wrapper rt, diff --git a/src/runtime/relater.cc b/src/runtime/relater.cc index 854aec1c..eaee04bc 100644 --- a/src/runtime/relater.cc +++ b/src/runtime/relater.cc @@ -52,9 +52,14 @@ void Relater::get_from_repository( Handle handle ) } } -Relater::Relater( size_t threads, optional> runner, optional> scheduler ) - : scheduler_( scheduler.has_value() ? move( scheduler.value() ) : make_shared() ) +Relater::Relater( size_t threads, + optional> runner, + optional> scheduler, + optional repository_fix_table_size ) + : repository_( repository_fix_table_size.has_value() ? repository_fix_table_size.value() : 1000000 ) + , scheduler_( scheduler.has_value() ? move( scheduler.value() ) : make_shared() ) { + available_memory_.write().get() = sysconf( _SC_PHYS_PAGES ) * sysconf( _SC_PAGE_SIZE ); scheduler_->set_relater( *this ); local_ = make_shared( *this, threads, runner ); } @@ -87,6 +92,28 @@ Handle Relater::execute( Handle r ) } } +Handle Relater::direct_execute( Handle r ) +{ + if ( contains( r ) ) { + return get( r ).value().unwrap(); + } + + top_level = r; + bool expected = true; + if ( top_level_done_.compare_exchange_strong( expected, false ) ) { + if ( local_->get_info()->parallelism == 0 ) { + remotes_.read()->front().lock()->get( r ); + } else { + scheduler_->schedule( r ); + } + + top_level_done_.wait( false, std::memory_order_acquire ); + return result; + } else { + throw std::runtime_error( "Unexpected top level value." ); + } +} + bool Relater::finish_top_level( Handle name, Handle value ) { if ( !top_level_done_.load( std::memory_order_acquire ) and name == top_level ) { @@ -137,6 +164,23 @@ optional Relater::get_shallow( Handle name ) throw HandleNotFound( handle::fix( name ) ); } +optional Relater::get_shallow_tmp( Handle name ) +{ + if ( tmp_trees_.contains( name ) ) { + return tmp_trees_.get( name ); + } else if ( storage_.contains( name ) ) { + return storage_.get( name ); + } else if ( storage_.contains_shallow( name ) ) { + return storage_.get_shallow( name ); + } else if ( repository_.contains_shallow( name ) ) { + auto x = repository_.get( name ).value(); + tmp_trees_.insert( name, x ); + return x; + } + + throw HandleNotFound( handle::fix( name ) ); +} + optional> Relater::get( Handle name ) { if ( storage_.contains( name ) ) { diff --git a/src/runtime/relater.hh b/src/runtime/relater.hh index 665e644d..2b6f0d2d 100644 --- a/src/runtime/relater.hh +++ b/src/runtime/relater.hh @@ -1,10 +1,12 @@ #pragma once +#include "channel.hh" #include "dependency_graph.hh" #include "handle.hh" #include "repository.hh" #include "runner.hh" #include "runtimestorage.hh" +#include class Executor; class Scheduler; @@ -29,27 +31,36 @@ private: SharedMutex graph_ {}; RuntimeStorage storage_ {}; - Repository repository_ {}; + Repository repository_; std::shared_ptr scheduler_ {}; SharedMutex>> remotes_ {}; std::shared_ptr local_ {}; + SharedMutex available_memory_ {}; + + // tmp_trees_ holds Trees that only the first layers (the TreeData) are presenting in memory + FixTable tmp_trees_ { 10000 }; + template void get_from_repository( Handle handle ); public: Relater( size_t threads = std::thread::hardware_concurrency(), std::optional> runner = {}, - std::optional> scheduler = {} ); + std::optional> scheduler = {}, + std::optional repository_fix_table_size = {} ); virtual void add_worker( std::shared_ptr ) override; Handle execute( Handle x ); + Handle direct_execute( Handle x ); virtual std::optional get( Handle name ) override; virtual std::optional get( Handle name ) override; virtual std::optional> get( Handle name ) override; virtual std::optional get_shallow( Handle name ) override; + // Load only the first layer of a Tree + virtual std::optional get_shallow_tmp( Handle name ); virtual std::optional> get_handle( Handle name ) override; virtual void put( Handle name, BlobData data ) override; virtual void put( Handle name, TreeData data ) override; diff --git a/src/runtime/runtimes.cc b/src/runtime/runtimes.cc index 9ee2ebea..e44be22b 100644 --- a/src/runtime/runtimes.cc +++ b/src/runtime/runtimes.cc @@ -2,6 +2,7 @@ #include "handle.hh" #include "overload.hh" #include "types.hh" +#include #include #include @@ -59,16 +60,27 @@ Server::~Server() network_worker_->stop(); } +DataServerRT::~DataServerRT() +{ + network_worker_->stop(); +} + void Server::join() { network_worker_->join(); } +void DataServerRT::join() +{ + network_worker_->join(); +} + shared_ptr Server::init( const Address& address, shared_ptr scheduler, - vector
peer_servers ) + vector
peer_servers, + optional threads ) { - auto runtime = std::make_shared( scheduler ); + auto runtime = std::make_shared( scheduler, threads ); runtime->network_worker_.emplace( runtime->relater_ ); runtime->network_worker_->start(); runtime->network_worker_->start_server( address ); @@ -84,6 +96,15 @@ shared_ptr Server::init( const Address& address, return runtime; } +shared_ptr DataServerRT::init( const Address& address ) +{ + auto runtime = make_shared(); + runtime->network_worker_.emplace( runtime->relater_ ); + runtime->network_worker_->start(); + runtime->network_worker_->start_server( address ); + return runtime; +} + template void Client::send_job( Handle handle, unordered_set> visited ) { @@ -104,8 +125,10 @@ void Client::send_job( Handle handle, unordered_set> visited ) } else { if ( server_->contains( handle ) ) { // Load the data on the server side + VLOG( 3 ) << "send_job loading " << handle << " on server"; server_->put( handle, {} ); } else { + VLOG( 3 ) << "send_job sending " << handle << " on server"; if constexpr ( FixTreeType ) { if ( relater_.contains( handle ) ) { auto tree = relater_.get( handle ).value(); @@ -129,3 +152,9 @@ Handle Client::execute( Handle x ) send_job( x ); return relater_.execute( x ); } + +void Client::run( Handle x ) +{ + send_job( x ); + server_->get( x ); +} diff --git a/src/runtime/runtimes.hh b/src/runtime/runtimes.hh index b429f4e3..6196ef14 100644 --- a/src/runtime/runtimes.hh +++ b/src/runtime/runtimes.hh @@ -3,7 +3,6 @@ #include "handle.hh" #include "network.hh" #include "relater.hh" -#include "repository.hh" #include @@ -39,7 +38,7 @@ class Client : public FrontendRT { protected: Relater relater_ { 0 }; - std::optional network_worker_ {}; + std::optional> network_worker_ {}; std::shared_ptr server_ {}; template @@ -51,6 +50,7 @@ public: static std::shared_ptr init( const Address& address ); virtual Handle execute( Handle x ) override; + void run( Handle x ); IRuntime& get_rt() { return relater_; } std::shared_ptr& get_server() { return server_; } @@ -60,16 +60,32 @@ class Server { protected: Relater relater_; - std::optional network_worker_ {}; + std::optional> network_worker_ {}; public: - Server( std::shared_ptr scheduler ) - : relater_( std::thread::hardware_concurrency() - 1, {}, scheduler ) + Server( std::shared_ptr scheduler, std::optional threads = {} ) + : relater_( threads.has_value() ? threads.value() : std::thread::hardware_concurrency() - 1, {}, scheduler ) {} static std::shared_ptr init( const Address& address, std::shared_ptr scheduler, - const std::vector
peer_servers = {} ); + const std::vector
peer_servers = {}, + std::optional threads = {} ); void join(); ~Server(); }; + +class DataServerRT +{ +protected: + Relater relater_ { 0 }; + std::optional> network_worker_ {}; + +public: + DataServerRT() {} + ~DataServerRT(); + + static std::shared_ptr init( const Address& address ); + + void join(); +}; diff --git a/src/runtime/scheduler.cc b/src/runtime/scheduler.cc index 0b6d5385..68d3d26e 100644 --- a/src/runtime/scheduler.cc +++ b/src/runtime/scheduler.cc @@ -43,7 +43,9 @@ SketchGraphScheduler::Result SketchGraphScheduler::load( Handle hand return handle.visit>( overload { []( Handle l ) { return l; }, [&]( Handle n ) -> Result { - sketch_graph_.add_dependency( current_schedule_step_.value(), n ); + if ( !go_for_it_ ) { + sketch_graph_.add_dependency( current_schedule_step_.value(), n ); + } if ( relater_->get().contains( n ) ) { relater_->get().get( n ); return n; @@ -59,7 +61,9 @@ SketchGraphScheduler::Result SketchGraphScheduler::load( Handleget().get_handle( handle ).value(); return handle.visit>( [&]( auto h ) -> Result { - sketch_graph_.add_dependency( current_schedule_step_.value(), h ); + if ( !go_for_it_ ) { + sketch_graph_.add_dependency( current_schedule_step_.value(), h ); + } if ( relater_->get().contains( h ) ) { relater_->get().get( h ); @@ -90,7 +94,9 @@ SketchGraphScheduler::Result SketchGraphScheduler::loadShallow( Handle< bool SketchGraphScheduler::loadShallow( Handle handle, Handle ref ) { return ref.visit( [&]( auto r ) -> bool { - sketch_graph_.add_dependency( current_schedule_step_.value(), r ); + if ( !go_for_it_ ) { + sketch_graph_.add_dependency( current_schedule_step_.value(), r ); + } if ( relater_->get().contains_shallow( handle ) ) { relater_->get().get_shallow( handle ); @@ -113,7 +119,9 @@ SketchGraphScheduler::Result SketchGraphScheduler::load( Handleget().unref( handle ); auto d = h.visit>( []( auto h ) { return h; } ); - sketch_graph_.add_dependency( current_schedule_step_.value(), d ); + if ( !go_for_it_ ) { + sketch_graph_.add_dependency( current_schedule_step_.value(), d ); + } if ( relater_->get().contains( h ) ) { relater_->get().get( h ); @@ -175,13 +183,34 @@ LocalScheduler::Result LocalScheduler::select_single( Handle h, return nil; } + auto handle_tree_ref = [&]( auto x ) { + auto tree = relater_->get().get_shallow_tmp( relater_->get().unref( x ) ); + auto res = tree.value()->span()[i]; + res.template unwrap().template unwrap().template visit( overload { + [&]( Handle x ) { + x.visit( overload { + [&]( Handle x ) { + x.visit( overload { + [&]( Handle x ) { res = Handle( x ); }, + []( Handle ) {}, + } ); + }, + [&]( Handle x ) { res = relater_->get().ref( x ).unwrap(); }, + []( Handle ) {}, + []( Handle ) {}, + } ); + }, + [&]( Handle x ) { res = relater_->get().ref( x ).unwrap(); }, + []( Handle ) {}, + []( Handle ) {}, + } ); + + return res.template unwrap().template unwrap(); + }; + // h is one of BlobRef, ValueTreeRef or ObjectTreeRef auto result = h.visit>( overload { - [&]( Handle x ) { - return loadShallow( relater_->get().unref( x ) ) - .and_then( [&]( Handle x ) { return relater_->get().get_shallow( x ); } ) - .transform( [&]( TreeData d ) { return d->span()[i].unwrap().unwrap(); } ); - }, + [&]( Handle x ) { return handle_tree_ref( x ); }, [&]( Handle v ) { return v.visit>( overload { [&]( Handle l ) { return Handle( l.view().substr( i, 1 ) ); }, @@ -190,11 +219,7 @@ LocalScheduler::Result LocalScheduler::select_single( Handle h, .and_then( [&]( Handle x ) { return relater_->get().get( x.unwrap() ); } ) .transform( [&]( BlobData b ) { return Handle( b->span()[i] ); } ); }, - [&]( Handle x ) { - return loadShallow( relater_->get().unref( x ) ) - .and_then( [&]( Handle x ) { return relater_->get().get_shallow( x ); } ) - .transform( [&]( TreeData d ) { return d->span()[i].unwrap().unwrap(); } ); - }, + [&]( Handle x ) { return handle_tree_ref( x ); }, []( auto ) -> Result { throw std::runtime_error( "Invalid select" ); }, } ); }, @@ -518,6 +543,7 @@ SketchGraphScheduler::Result SketchGraphScheduler::evalStrict( Handleget().put( goal, result.value() ); + sketch_graph_.erase_forward_dependencies( goal ); } if ( current_schedule_step_.has_value() @@ -585,6 +611,7 @@ SketchGraphScheduler::Result SketchGraphScheduler::force( Handle if ( result.has_value() ) { relater_->get().put( goal, result.value() ); + sketch_graph_.erase_forward_dependencies( goal ); } return result; @@ -1061,8 +1088,12 @@ optional> SketchGraphScheduler::run_passes( Handle top_ if ( thunk.has_value() ) { nested_ = false; go_for_it_ = true; - - return evaluator_.force( thunk.value() ); + auto result = evaluator_.force( thunk.value() ); + if ( !result.has_value() ) { + dynamic_pointer_cast( relater_->get().get_local() )->retry( r ); + } else { + return result; + } } } else { for ( auto job : unblocked ) { diff --git a/src/runtime/scheduler.hh b/src/runtime/scheduler.hh index b2404421..6a689ef9 100644 --- a/src/runtime/scheduler.hh +++ b/src/runtime/scheduler.hh @@ -66,7 +66,7 @@ class SketchGraphScheduler : public Scheduler { friend class RelaterTest; -private: +protected: std::vector passes_; virtual Result run_passes( Handle top_level_job ); bool loadShallow( Handle, Handle ); @@ -119,11 +119,3 @@ public: PassRunner::PassType::InOutSource } ) {} }; - -class RandomScheduler : public SketchGraphScheduler -{ -public: - RandomScheduler() - : SketchGraphScheduler( { PassRunner::PassType::Random } ) - {} -}; diff --git a/src/storage/interface.hh b/src/storage/interface.hh index 264c337c..c12b2d0a 100644 --- a/src/storage/interface.hh +++ b/src/storage/interface.hh @@ -43,7 +43,22 @@ public: virtual std::optional> get( Handle name ) = 0; ///}@ - virtual std::optional> get_handle( Handle ) = 0; + /* + * Given the Handle @p name to a Tree, return the handle with the accurate type information that refers to the + * same Tree. + * + * @param name The name of the Tree to look up. + * @return The corresponding tightliest bounded Handle that refers to the same Tree. + */ + virtual std::optional> get_handle( Handle name ) = 0; + + /* + * Given the Handle @p name to a Tree, return the name of Tree where each entry is the Refed version of the + * corresponding entry of the input Tree. + * + * @param name The name of the Tree to get a shallow copy of. + * @return The Tree with each entry shallowed. + */ virtual std::optional get_shallow( Handle name ) = 0; /** diff --git a/src/storage/repository.cc b/src/storage/repository.cc index c49f3a28..a49cbf47 100644 --- a/src/storage/repository.cc +++ b/src/storage/repository.cc @@ -12,8 +12,11 @@ using namespace std; namespace fs = std::filesystem; -Repository::Repository( std::filesystem::path directory ) +Repository::Repository( size_t fix_table_size, std::filesystem::path directory ) : repo_( find( directory ) ) + , blobs_( fix_table_size ) + , trees_( fix_table_size ) + , relations_( fix_table_size ) { VLOG( 1 ) << "using repository " << repo_; diff --git a/src/storage/repository.hh b/src/storage/repository.hh index e870e605..7872f017 100644 --- a/src/storage/repository.hh +++ b/src/storage/repository.hh @@ -14,12 +14,12 @@ class Repository : public IRuntime { std::filesystem::path repo_; - FixTable blobs_ { 1000000 }; - FixTable trees_ { 1000000 }; - FixTable relations_ { 1000000 }; + FixTable blobs_; + FixTable trees_; + FixTable relations_; public: - Repository( std::filesystem::path directory = std::filesystem::current_path() ); + Repository( size_t fix_table_size = 1000000, std::filesystem::path directory = std::filesystem::current_path() ); static std::filesystem::path find( std::filesystem::path directory = std::filesystem::current_path() ); std::unordered_set> data() const override; diff --git a/src/tester/CMakeLists.txt b/src/tester/CMakeLists.txt index 5c0e3e2c..8ac4db56 100644 --- a/src/tester/CMakeLists.txt +++ b/src/tester/CMakeLists.txt @@ -7,17 +7,11 @@ target_include_directories (fix PUBLIC "${PROJECT_SOURCE_DIR}/src/tests") add_executable(fixpoint-server "fixpoint-server.cc" "tester-utils.cc") target_link_libraries(fixpoint-server runtime) +add_executable(data-server "data-server.cc" "tester-utils.cc") +target_link_libraries(data-server runtime) + add_executable(fixpoint-client "fixpoint-client.cc" "main.cc" "tester-utils.cc") target_link_libraries(fixpoint-client runtime) -add_executable(fixpoint-client-wikipedia "fixpoint-client-wikipedia.cc" "tester-utils.cc") -target_link_libraries(fixpoint-client-wikipedia runtime) - -add_executable(fixpoint-client-compile "fixpoint-client-compile.cc" "tester-utils.cc") -target_link_libraries(fixpoint-client-compile runtime) - -add_executable(fixpoint-client-bptree "fixpoint-client-bptree.cc" "tester-utils.cc") -target_link_libraries(fixpoint-client-bptree runtime) - # add_executable(http-tester "http-tester.cc" "main.cc" "tester-utils.cc") # target_link_libraries(http-tester runtime http) diff --git a/src/tester/data-server.cc b/src/tester/data-server.cc new file mode 100644 index 00000000..80f3445f --- /dev/null +++ b/src/tester/data-server.cc @@ -0,0 +1,29 @@ +#include "option-parser.hh" +#include "runtimes.hh" + +using namespace std; + +int main( int argc, char* argv[] ) +{ + OptionParser parser( "data-server", "Run a data server" ); + + uint16_t port; + size_t latency; + parser.AddArgument( + "listening-port", OptionParser::ArgumentCount::One, [&]( const char* argument ) { port = stoi( argument ); } ); + parser.AddOption( + 'r', "response-latency", "response-latency", "Latency of request responess in us", [&]( const char* argument ) { + latency = stoi( argument ); + } ); + + parser.Parse( argc, argv ); + + DataServer::latency = latency; + Address listen_address( "0.0.0.0", port ); + + auto server = DataServerRT::init( listen_address ); + + server->join(); + + return 0; +} diff --git a/src/tester/fixpoint-client-bptree.cc b/src/tester/fixpoint-client-bptree.cc deleted file mode 100644 index bd2c5090..00000000 --- a/src/tester/fixpoint-client-bptree.cc +++ /dev/null @@ -1,87 +0,0 @@ -#include -#include -#include -#include - -#include "object.hh" -#include "runtimes.hh" -#include "tester-utils.hh" - -using namespace std; - -int main( int argc, char* argv[] ) -{ - shared_ptr rt; - - if ( argc != 4 ) { - cerr << "Usage: +[address]:[port] [key-list] [# of keys]" << endl; - return -1; - } - - if ( argv[1][0] == '+' ) { - string addr( &argv[1][1] ); - if ( addr.find( ':' ) == string::npos ) { - throw runtime_error( "invalid argument " + addr ); - } - Address address( addr.substr( 0, addr.find( ':' ) ), stoi( addr.substr( addr.find( ':' ) + 1 ) ) ); - rt = Client::init( address ); - } else { - exit( 1 ); - } - - auto bptree_get = rt->get_rt() - .labeled( "bptree-get" ) - .unwrap() - .unwrap() - .unwrap() - .unwrap(); - rt->get_rt().get( bptree_get ); - - auto tree_root = rt->get_rt() - .labeled( "tree-root" ) - .unwrap() - .unwrap() - .unwrap() - .unwrap(); - - auto selection_tree = OwnedMutTree::allocate( 2 ); - selection_tree[0] = tree_root; - selection_tree[1] = Handle( (uint64_t)0 ); - auto select = rt->get_rt().create( make_shared( std::move( selection_tree ) ) ).unwrap(); - - ifstream keys_list( argv[2] ); - vector keys; - for ( int i = 0; i < atoi( argv[3] ); i++ ) { - int key; - keys_list >> key; - keys.push_back( key ); - } - - auto parallel_tree = OwnedMutTree::allocate( keys.size() ); - size_t index = 0; - for ( auto key : keys ) { - auto tree = OwnedMutTree::allocate( 5 ); - tree.at( 0 ) = make_limits( rt->get_rt(), 1024 * 1024, 1024, 1 ).into(); - tree.at( 1 ) = bptree_get; - tree.at( 2 ) = Handle( Handle( Handle( select ) ) ); - tree.at( 3 ) = tree_root; - tree.at( 4 ) = Handle( (int)key ); - auto combination = rt->get_rt().create( make_shared( std::move( tree ) ) ).unwrap(); - auto application = Handle( combination ); - parallel_tree[index] = application; - index++; - } - - auto combination - = rt->get_rt().create( make_shared( std::move( parallel_tree ) ) ).unwrap(); - - auto start = chrono::steady_clock::now(); - auto res = rt->execute( Handle( combination ) ); - auto end = chrono::steady_clock::now(); - chrono::duration diff = end - start; - - cout << "Result:\n" << res << endl; - cout << "Duration [seconds]: " << diff << endl; - - return 0; -} diff --git a/src/tester/fixpoint-client-compile.cc b/src/tester/fixpoint-client-compile.cc deleted file mode 100644 index 8b3cb76b..00000000 --- a/src/tester/fixpoint-client-compile.cc +++ /dev/null @@ -1,56 +0,0 @@ -#include -#include - -#include "handle.hh" -#include "runtimes.hh" -#include "tester-utils.hh" -#include "types.hh" - -using namespace std; - -int main( int argc, char* argv[] ) -{ - std::shared_ptr rt; - - if ( argc != 3 ) - cerr << "Usage: +[address]:[port] [label-to-compile]" << endl; - - if ( argv[1][0] == '+' ) { - string addr( &argv[1][1] ); - if ( addr.find( ':' ) == string::npos ) { - throw runtime_error( "invalid argument " + addr ); - } - Address address( addr.substr( 0, addr.find( ':' ) ), stoi( addr.substr( addr.find( ':' ) + 1 ) ) ); - rt = Client::init( address ); - } else { - exit( 1 ); - } - - auto compile_encode = rt->get_rt() - .labeled( "compile-encode" ) - .unwrap() - .unwrap() - .unwrap() - .unwrap(); - - auto target = Handle( - rt->get_rt().labeled( argv[2] ).unwrap().unwrap().unwrap().unwrap() ); - - auto application = OwnedMutTree::allocate( 3 ); - application[0] = make_limits( rt->get_rt(), 1024 * 1024 * 1024, 1024 * 1024, 1 ); - application[1] = compile_encode; - application[2] = target; - - auto handle = rt->get_rt().create( make_shared( std::move( application ) ) ).unwrap(); - - auto start = chrono::steady_clock::now(); - auto res = rt->execute( Handle( Handle( Handle( handle::upcast( handle ) ) ) ) ); - auto end = chrono::steady_clock::now(); - chrono::duration diff = end - start; - - // print the result - cout << "Result:\n" << res << endl; - cout << "Duration [seconds]: " << diff << endl; - - return 0; -} diff --git a/src/tester/fixpoint-client-wikipedia.cc b/src/tester/fixpoint-client-wikipedia.cc deleted file mode 100644 index f583726a..00000000 --- a/src/tester/fixpoint-client-wikipedia.cc +++ /dev/null @@ -1,192 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "handle.hh" -#include "handle_post.hh" -#include "object.hh" -#include "runtimes.hh" -#include "tester-utils.hh" -#include "types.hh" - -using namespace std; - -int min_args = 0; -int max_args = -1; - -Handle compile( IRuntime& rt, std::string_view file ) -{ - auto wasm = rt.create( std::make_shared( file ) ); - auto compile = rt.labeled( "compile-encode" ).unwrap().unwrap().unwrap(); - auto application = OwnedMutTree::allocate( 3 ); - application[0] = make_limits( rt, 1024 * 1024 * 1024, 1024 * 1024, 1, false ); - application[1] = Handle( Handle( compile ) ); - application[2] = wasm; - auto handle = rt.create( make_shared( std::move( application ) ) ).unwrap(); - return Handle( handle::upcast( handle ) ); -} - -void progress( double x ) -{ - x = std::clamp( x, 0.0, 1.0 ); - struct winsize w; - ioctl( STDOUT_FILENO, TIOCGWINSZ, &w ); - cerr << "\r"; - cerr << "["; - for ( int i = 0; i < w.ws_col - 2; i++ ) { - if ( i / (double)( w.ws_col - 2 ) < x ) { - cerr << "#"; - } else { - cerr << " "; - } - } - cerr << "]"; -} - -int main( int argc, char* argv[] ) -{ - if ( argc != 3 && argc != 4 ) { - cerr << "Usage: fixpoint-client-wikipedia needle-string haystack-label [address:port]" << endl; - cerr << "\tset the environment variable SMALL_SLICES to use 10 MiB slices (default 1 GiB)" << endl; - exit( 1 ); - } - - auto needle_string = std::string( argv[1] ); - auto haystack_label = std::string( argv[2] ); - - /* auto ro_rt = [&] { */ - /* auto client = ReadOnlyRT::init(); */ - /* std::shared_ptr frontend = dynamic_pointer_cast( client ); */ - /* return make_pair( frontend, std::reference_wrapper( client->get_rt() ) ); */ - /* }; */ - auto rw_rt = [&] { - auto client = ReadWriteRT::init(); - std::shared_ptr frontend = dynamic_pointer_cast( client ); - return make_pair( frontend, std::reference_wrapper( client->get_rt() ) ); - }; - auto client_rt = [&] { - std::shared_ptr client; - string addr( argv[3] ); - if ( addr.find( ':' ) == string::npos ) { - throw runtime_error( "invalid argument " + addr ); - } - Address address( addr.substr( 0, addr.find( ':' ) ), stoi( addr.substr( addr.find( ':' ) + 1 ) ) ); - client = Client::init( address ); - std::shared_ptr frontend = dynamic_pointer_cast( client ); - return make_pair( frontend, std::reference_wrapper( client->get_rt() ) ); - }; - auto [client, rt] = argc == 3 ? rw_rt() : client_rt(); - - auto mapreduce = client->execute( - Handle( compile( rt, "build/applications-prefix/src/applications-build/mapreduce/mapreduce.wasm" ) ) ); - auto mapper = client->execute( Handle( - compile( rt, "build/applications-prefix/src/applications-build/count-words/count_words.wasm" ) ) ); - auto reducer = client->execute( Handle( - compile( rt, "build/applications-prefix/src/applications-build/count-words/merge_counts.wasm" ) ) ); - - auto needle_blob = OwnedMutBlob::allocate( needle_string.size() ); - memcpy( needle_blob.data(), needle_string.data(), needle_string.size() ); - auto needle = rt.create( std::make_shared( std::move( needle_blob ) ) ); - - auto haystack = handle::data( rt.labeled( haystack_label ) ) - .value() - .visit>( overload { - [&]( Handle blob ) { return Handle( blob ); }, - [&]( Handle blob ) { return Handle( blob ); }, - [&]( Handle ref ) { return ref; }, - [&]( auto ) -> Handle { throw runtime_error( "haystack label was not a blob" ); }, - } ); - -#define KiB 1024 -#define MiB ( 1024 * KiB ) -#define GiB ( 1024 * MiB ) - - const size_t TOTAL_SIZE = handle::size( haystack ); - const size_t LEAF_SIZE = getenv( "SMALL_SLICES" ) ? 10 * MiB : 1 * GiB; - const size_t LEAVES = std::ceil( TOTAL_SIZE / (double)LEAF_SIZE ); - cerr << "Running " << LEAVES << " parallel maps." << endl; - bool precompute_only = needle_string.empty(); - if ( precompute_only ) { - cerr << "Precomputing selections." << endl; - progress( 0 ); - } - auto args = OwnedMutTree::allocate( LEAVES ); - for ( size_t i = 0; i < LEAVES; i++ ) { - if ( precompute_only ) - progress( i / (double)LEAVES ); - auto selection = OwnedMutTree::allocate( 3 ); - selection[0] = haystack; - selection[1] = Handle( LEAF_SIZE * i ); - selection[2] = Handle( std::min( LEAF_SIZE * ( i + 1 ), TOTAL_SIZE ) ); - auto handle = rt.create( make_shared( std::move( selection ) ) ).unwrap(); - auto select = Handle( Handle( handle ) ); - if ( precompute_only ) { - // force this to execute and be cached - client->execute( Handle( select ) ); - } - - auto arg_tree = OwnedMutTree::allocate( 2 ); - arg_tree[0] = needle; - arg_tree[1] = select; - args[i] = rt.create( make_shared( std::move( arg_tree ) ) ).unwrap(); - } - - if ( precompute_only ) { - cerr << "No search query provided." << endl; - exit( EXIT_SUCCESS ); - } - - auto tree = rt.create( make_shared( std::move( args ) ) ).unwrap(); - - auto toplevel = OwnedMutTree::allocate( 7 ); - toplevel[0] = make_limits( rt, 1024 * 1024 * 1024, 1024 * 1024, 1, false ); - toplevel[1] = mapreduce; - toplevel[2] = mapper; - toplevel[3] = reducer; - toplevel[4] = Handle( tree, LEAVES ); - toplevel[5] = make_limits( rt, 1024 * 1024 * 1024, 256 * 8, 1, false ); - toplevel[6] = make_limits( rt, 1024 * 1024 * 1024, 256 * 8, 1, true ); - - auto handle = rt.create( make_shared( std::move( toplevel ) ) ).unwrap(); - cerr << "Executing." << endl; - struct timespec before, after; - struct timespec before_real, after_real; - if ( clock_gettime( CLOCK_REALTIME, &before_real ) ) { - perror( "clock_gettime" ); - exit( 1 ); - } - if ( clock_gettime( CLOCK_PROCESS_CPUTIME_ID, &before ) ) { - perror( "clock_gettime" ); - exit( 1 ); - } - auto res = client->execute( Handle( Handle( Handle( handle::upcast( handle ) ) ) ) ); - if ( clock_gettime( CLOCK_PROCESS_CPUTIME_ID, &after ) ) { - perror( "clock_gettime" ); - exit( 1 ); - } - if ( clock_gettime( CLOCK_REALTIME, &after_real ) ) { - perror( "clock_gettime" ); - exit( 1 ); - } - - double delta = after.tv_sec - before.tv_sec + (double)( after.tv_nsec - before.tv_nsec ) * 1e-9; - double delta_real - = after_real.tv_sec - before_real.tv_sec + (double)( after_real.tv_nsec - before_real.tv_nsec ) * 1e-9; - - // print the result - cerr << "Result: " << res << endl; - cerr << "Handle: " << res.content << endl; - if ( argc == 3 ) { - cerr << "CPU Time: " << delta << " seconds" << endl; - } - cerr << "Real Time: " << delta_real << " seconds" << endl; - uint64_t count = uint64_t( handle::extract( res ).value() ); - cerr << "Count: " << count << endl; - - return 0; -} diff --git a/src/tester/fixpoint-server.cc b/src/tester/fixpoint-server.cc index 14e7578b..effb5032 100644 --- a/src/tester/fixpoint-server.cc +++ b/src/tester/fixpoint-server.cc @@ -47,6 +47,8 @@ int main( int argc, char* argv[] ) optional local; optional peerfile; optional sche_opt; + optional threads; + parser.AddArgument( "listening-port", OptionParser::ArgumentCount::One, [&]( const char* argument ) { port = stoi( argument ); } ); parser.AddOption( 'a', @@ -59,13 +61,15 @@ int main( int argc, char* argv[] ) 'p', "peers", "peers", "Path to a file that contains a list of all servers.", [&]( const char* argument ) { peerfile = argument; } ); + parser.AddOption( 's', "scheduler", "scheduler", "Scheduler to use [onepass, hint]", [&]( const char* argument ) { + sche_opt = argument; + if ( not( *sche_opt == "onepass" or *sche_opt == "hint" ) ) { + throw runtime_error( "Invalid scheduler: " + sche_opt.value() ); + } + } ); parser.AddOption( - 's', "scheduler", "scheduler", "Scheduler to use [onepass, hint, random]", [&]( const char* argument ) { - sche_opt = argument; - if ( not( *sche_opt == "onepass" or *sche_opt == "hint" or *sche_opt == "random" ) ) { - throw runtime_error( "Invalid scheduler: " + sche_opt.value() ); - } - } ); + 't', "threads", "#", "Number of threads", [&]( const char* argument ) { threads = stoull( argument ); } ); + parser.Parse( argc, argv ); Address listen_address( "0.0.0.0", port ); @@ -97,16 +101,14 @@ int main( int argc, char* argv[] ) shared_ptr scheduler = make_shared(); if ( sche_opt.has_value() ) { - if ( *sche_opt == "onepass" ) { - scheduler = make_shared(); - } else if ( *sche_opt == "hint" ) { + if ( *sche_opt == "hint" ) { scheduler = make_shared(); - } else if ( *sche_opt == "random" ) { - scheduler = make_shared(); + } else if ( *sche_opt == "local" ) { + scheduler = make_shared(); } } - auto server = Server::init( listen_address, scheduler, peer_address ); + auto server = Server::init( listen_address, scheduler, peer_address, threads ); cout << "Server initialized" << endl; server->join(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 5b1fe9c2..378119a5 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -84,7 +84,3 @@ target_link_libraries(test-return-flatware runtime) add_executable(test-python-flatware test-python-flatware.cc fixpoint-test-main.cc) target_link_libraries(test-python-flatware runtime) - -add_executable(create-bptree create-bptree.cc) -target_link_libraries(create-bptree runtime) - diff --git a/src/tests/bptree-helper.hh b/src/tests/bptree-helper.hh index 249a1a44..d482e032 100644 --- a/src/tests/bptree-helper.hh +++ b/src/tests/bptree-helper.hh @@ -1,3 +1,5 @@ +#pragma once + #include "bptree.hh" #include "handle.hh" #include "object.hh" @@ -5,22 +7,41 @@ #include namespace bptree { -static Handle to_storage_keys( RuntimeStorage& storage, Node* node ) + +template +static Handle to_storage_keys( RuntimeStorage& storage, Node* node ) { - const auto& keys = node->get_keys(); - auto blob = OwnedMutBlob::allocate( keys.size() * sizeof( int ) + 1 ); + auto key_data = node->get_key_data(); + auto blob = OwnedMutBlob::allocate( key_data.size() + 1 ); blob[0] = node->is_leaf(); - memcpy( blob.data() + 1, keys.data(), keys.size() * sizeof( int ) ); + memcpy( blob.data() + 1, key_data.data(), key_data.size() ); return storage.create( std::make_shared( std::move( blob ) ) ); } -static std::deque> to_storage_leaves( RuntimeStorage& storage, BPTree& bptree ) +template +Handle to_storage_data( RuntimeStorage& storage, const Val& val ) +{ + auto d = OwnedMutBlob::allocate( val.size() ); + memcpy( d.data(), val.data(), val.size() ); + return storage.create( std::make_shared( std::move( d ) ) ); +} + +template<> +inline Handle to_storage_data( RuntimeStorage& storage, const int& i ) +{ + auto d = OwnedMutBlob::allocate( sizeof( i ) ); + memcpy( d.data(), &i, sizeof( i ) ); + return storage.create( std::make_shared( std::move( d ) ) ); +} + +template +static std::deque> to_storage_leaves( RuntimeStorage& storage, BPTree& bptree ) { static auto nil = storage.create( std::make_shared( OwnedMutTree::allocate( 0 ) ) ).unwrap(); - std::deque leaf_nodes; - bptree.dfs_visit( [&]( Node* node ) { + std::deque*> leaf_nodes; + bptree.dfs_visit( [&]( Node* node ) { if ( node->is_leaf() ) { leaf_nodes.push_back( node ); } @@ -33,19 +54,15 @@ static std::deque> to_storage_leaves( RuntimeStorage& stora leaf_nodes.pop_back(); auto tree = OwnedMutTree::allocate( 1 + bptree.get_degree() + 1 ); - tree[0] = to_storage_keys( storage, node ) - .visit>( overload { + tree[0] = to_storage_keys( storage, node ) + .template visit>( overload { []( Handle l ) { return l; }, []( Handle n ) { return Handle( n ); }, } ); size_t i = 1; for ( const auto& data : node->get_data() ) { - auto d = OwnedMutBlob::allocate( data.size() ); - memcpy( d.data(), data.data(), data.size() ); - tree[i] = storage.create( std::make_shared( std::move( d ) ) ) - .visit>( overload { []( Handle l ) { return l; }, - []( Handle n ) { return Handle( n ); } } ); + tree[i] = to_storage_data( storage, data ); i++; } @@ -59,14 +76,18 @@ static std::deque> to_storage_leaves( RuntimeStorage& stora tree[i] = Handle( nil, 0 ); } - last = storage.ref( storage.create( std::make_shared( std::move( tree ) ) ) ).unwrap(); + last = storage.ref( storage.create( std::make_shared( std::move( tree ) ) ) ) + .template unwrap(); result.push_back( last.value() ); } return result; } -static std::deque> to_repo_leaves( RuntimeStorage& storage, Repository& repo, BPTree& bptree ) +template +static std::deque> to_repo_leaves( RuntimeStorage& storage, + Repository& repo, + BPTree& bptree ) { auto res = to_storage_leaves( storage, bptree ); @@ -75,7 +96,7 @@ static std::deque> to_repo_leaves( RuntimeStorage& storage, auto tree = storage.get( unref ); repo.put( unref, tree ); for ( auto d : tree->span() ) { - d.unwrap().unwrap().unwrap().visit( + d.template unwrap().template unwrap().template unwrap().template visit( overload { [&]( Handle b ) { auto unref = b.unwrap().unwrap(); auto blob = storage.get( unref ); @@ -88,8 +109,9 @@ static std::deque> to_repo_leaves( RuntimeStorage& storage, return res; } +template static Handle to_storage_internal( RuntimeStorage& storage, - const std::shared_ptr& node, + const std::shared_ptr>& node, std::deque>& leaves ) { if ( node->is_leaf() ) { @@ -99,8 +121,8 @@ static Handle to_storage_internal( RuntimeStorage& storage, } else { size_t tree_size = 1 + ( node->is_leaf() ? node->get_data().size() : node->get_children().size() ); auto tree = OwnedMutTree::allocate( tree_size ); - tree[0] = to_storage_keys( storage, node.get() ) - .visit>( overload { + tree[0] = to_storage_keys( storage, node.get() ) + .template visit>( overload { []( Handle l ) { return l; }, []( Handle n ) { return Handle( n ); }, } ); @@ -116,9 +138,10 @@ static Handle to_storage_internal( RuntimeStorage& storage, } } +template static Handle to_repo_internal( RuntimeStorage& storage, Repository& repo, - const std::shared_ptr& node, + const std::shared_ptr>& node, std::deque>& leaves ) { if ( node->is_leaf() ) { @@ -128,8 +151,8 @@ static Handle to_repo_internal( RuntimeStorage& storage, } else { size_t tree_size = 1 + ( node->is_leaf() ? node->get_data().size() : node->get_children().size() ); auto tree = OwnedMutTree::allocate( tree_size ); - tree[0] = to_storage_keys( storage, node.get() ) - .visit>( overload { + tree[0] = to_storage_keys( storage, node.get() ) + .template visit>( overload { []( Handle l ) { return l; }, [&]( Handle n ) { repo.put( n, storage.get( n ) ); @@ -152,13 +175,15 @@ static Handle to_repo_internal( RuntimeStorage& storage, } } -static Handle to_storage( RuntimeStorage& storage, BPTree& tree ) +template +static Handle to_storage( RuntimeStorage& storage, BPTree& tree ) { auto leaves = to_storage_leaves( storage, tree ); return to_storage_internal( storage, tree.get_root(), leaves ); } -static Handle to_repo( RuntimeStorage& storage, Repository& repo, BPTree& tree ) +template +static Handle to_repo( RuntimeStorage& storage, Repository& repo, BPTree& tree ) { auto leaves = to_repo_leaves( storage, repo, tree ); return to_repo_internal( storage, repo, tree.get_root(), leaves ); diff --git a/src/tests/bptree.hh b/src/tests/bptree.hh index 71aab0f6..856c96a0 100644 --- a/src/tests/bptree.hh +++ b/src/tests/bptree.hh @@ -4,50 +4,74 @@ #include #include #include +#include #include +template class Node; + +template class BPTree; +template class Node { - std::vector keys_ {}; + std::vector keys_ {}; std::vector> children_ {}; - std::vector data_ {}; + std::vector data_ {}; bool isleaf_ { true }; - friend class BPTree; + friend class BPTree; - std::pair, int> split(); + std::pair, Key> split(); void dfs_visit( std::function visitor ); + std::string key_data_buf_ {}; + public: Node() {} - const std::vector& get_keys() const { return keys_; } - const std::vector& get_data() const { return data_; } + const std::vector& get_keys() const { return keys_; } + std::string_view get_key_data() + { + return { reinterpret_cast( keys_.data() ), keys_.size() * sizeof( Key ) }; + } + const std::vector& get_data() const { return data_; } const std::vector> get_children() const { return children_; } bool is_leaf() const { return isleaf_; } + + void set_keys( std::vector keys ) { keys_ = keys; } + void set_data( std::vector data ) + { + if ( !isleaf_ ) { + throw std::runtime_error( "Non leaf BPTree node should not have data." ); + } + data_ = data; + } }; +template class BPTree { const size_t degree_; - std::shared_ptr root_ { std::make_shared() }; + std::shared_ptr> root_ { std::make_shared>() }; public: BPTree( size_t degree ) : degree_( degree ) {} - void insert( int key, std::string value ); - void dfs_visit( std::function visitor ); - std::optional get( int key ); - std::shared_ptr get_root() { return root_; } + void insert( Key key, Value value ); + void dfs_visit( std::function* )> visitor ); + std::optional get( Key key ); + std::shared_ptr> get_root() { return root_; } + + void set_root( std::shared_ptr> root ) { root_ = root; } size_t get_degree() const { return degree_; } }; -std::pair, int> Node::split() +template +std::pair>, Key> Node::split() { auto res = std::make_shared(); @@ -75,10 +99,11 @@ std::pair, int> Node::split() return { res, middle_key }; } -void BPTree::insert( int key, std::string value ) +template +void BPTree::insert( Key key, Value value ) { - std::shared_ptr cursor = root_; - std::deque> path; + std::shared_ptr> cursor = root_; + std::deque>> path; while ( cursor->isleaf_ == false ) { path.push_back( cursor ); @@ -114,7 +139,7 @@ void BPTree::insert( int key, std::string value ) } // Create a new root - auto new_root = std::make_shared(); + auto new_root = std::make_shared>(); new_root->isleaf_ = false; new_root->keys_.push_back( new_node.second ); new_root->children_.push_back( new_node.first ); @@ -124,9 +149,10 @@ void BPTree::insert( int key, std::string value ) return; } -std::optional BPTree::get( int key ) +template +std::optional BPTree::get( Key key ) { - std::shared_ptr cursor = root_; + std::shared_ptr> cursor = root_; while ( cursor->isleaf_ == false ) { size_t idx = upper_bound( cursor->keys_.begin(), cursor->keys_.end(), key ) - cursor->keys_.begin(); @@ -142,7 +168,8 @@ std::optional BPTree::get( int key ) } } -void Node::dfs_visit( std::function visitor ) +template +void Node::dfs_visit( std::function* )> visitor ) { for ( const auto& child : children_ ) { child->dfs_visit( visitor ); @@ -150,7 +177,8 @@ void Node::dfs_visit( std::function visitor ) visitor( this ); } -void BPTree::dfs_visit( std::function visitor ) +template +void BPTree::dfs_visit( std::function* )> visitor ) { root_->dfs_visit( visitor ); } diff --git a/src/tests/create-bptree.cc b/src/tests/create-bptree.cc deleted file mode 100644 index 25489861..00000000 --- a/src/tests/create-bptree.cc +++ /dev/null @@ -1,75 +0,0 @@ -#include "bptree-helper.hh" -#include "option-parser.hh" -#include "relater.hh" -#include -#include -#include - -using namespace std; - -int main( int argc, char* argv[] ) -{ - if ( argc < 7 ) { - cerr << "Usage: " << argv[0] - << " [degree] [number of entries] [path to list of keys] [number of samples] [path to randomly sampled " - "keys in tree] [path to randomly sampled keys not in tree]" - << endl; - } - - auto rt = std::make_shared( 0 ); - size_t degree = stoi( argv[1] ); - BPTree tree( degree ); - size_t num_of_entries = stoi( argv[2] ); - - ofstream keys_file( argv[3] ); - - random_device rd; - mt19937 gen( rd() ); - uniform_int_distribution distrib( INT_MIN, INT_MAX ); - set key_set {}; - - while ( key_set.size() < num_of_entries ) { - auto idx = distrib( gen ); - if ( !key_set.contains( idx ) ) { - tree.insert( idx, std::to_string( idx ) ); - key_set.insert( idx ); - } - } - - for ( const auto& key : key_set ) { - keys_file << key << endl; - } - - keys_file.close(); - - size_t num_of_samples = stoi( argv[4] ); - - { - unordered_set keys_in_tree; - ofstream keys_in_tree_sample( argv[5] ); - uniform_int_distribution distrib( 0, key_set.size() ); - while ( keys_in_tree.size() < num_of_samples ) { - auto idx = distrib( gen ); - auto key = *next( key_set.begin(), idx ); - if ( !keys_in_tree.contains( key ) ) { - keys_in_tree.insert( key ); - keys_in_tree_sample << key << endl; - } - } - } - - { - unordered_set keys_not_in_tree; - ofstream keys_not_in_tree_sample( argv[6] ); - uniform_int_distribution distrib( INT_MIN, INT_MAX ); - while ( keys_not_in_tree.size() < num_of_samples ) { - auto key = distrib( gen ); - if ( !key_set.contains( key ) and !keys_not_in_tree.contains( key ) ) { - keys_not_in_tree.insert( key ); - keys_not_in_tree_sample << key << endl; - } - } - } - - cout << Handle( bptree::to_repo( rt->get_storage(), rt->get_repository(), tree ) ).content << endl; -} diff --git a/src/tests/test-bptree-get.cc b/src/tests/test-bptree-get.cc index 5cd83866..610913f2 100644 --- a/src/tests/test-bptree-get.cc +++ b/src/tests/test-bptree-get.cc @@ -74,7 +74,7 @@ string fix_bptree_get_n( shared_ptr rt, void check_bptree_get( shared_ptr rt, Handle bptree_fix, Handle bptree_elf, - BPTree& tree, + BPTree& tree, int key ) { string fix_result = fix_bptree_get( rt, bptree_fix, bptree_elf, key ); @@ -92,7 +92,7 @@ void check_bptree_get( shared_ptr rt, void check_bptree_get_n( shared_ptr rt, Handle bptree_fix, Handle bptree_get_n_elf, - BPTree& tree, + BPTree& tree, int key, uint64_t n ) { @@ -101,26 +101,26 @@ void check_bptree_get_n( shared_ptr rt, string result; optional n_done; - tree.dfs_visit( [&]( Node* node ) { + tree.dfs_visit( [&]( Node* node ) { if ( node->is_leaf() ) { if ( n_done.has_value() && *n_done < n ) { for ( const auto& d : node->get_data() ) { result += d; result += " "; } - n_done.value()++; return; } if ( node->get_keys().front() <= key && node->get_keys().back() >= key ) { - n_done = 1; + n_done = 0; auto pos = upper_bound( node->get_keys().begin(), node->get_keys().end(), key ); auto idx = pos - node->get_keys().begin() - 1; for ( size_t i = idx; i < node->get_data().size(); i++ ) { result += node->get_data()[i]; result += " "; } + n_done.value()++; } } } ); @@ -134,7 +134,7 @@ void check_bptree_get_n( shared_ptr rt, void test( shared_ptr rt ) { size_t degree = 4; - BPTree tree( degree ); + BPTree tree( degree ); random_device rd; mt19937 gen( rd() ); @@ -170,7 +170,7 @@ void test( shared_ptr rt ) } } - check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *key_set.begin(), 4 ); - check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *std::next( key_set.begin(), 100 ), 4 ); - check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *std::next( key_set.end(), -4 ), 4 ); + check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *key_set.begin(), 10 ); + check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *std::next( key_set.begin(), 100 ), 10 ); + check_bptree_get_n( rt, bptree_fix, bptree_get_n_elf, tree, *std::next( key_set.end(), -4 ), 10 ); } diff --git a/src/tests/test-bptree.cc b/src/tests/test-bptree.cc index f5937095..e184880e 100644 --- a/src/tests/test-bptree.cc +++ b/src/tests/test-bptree.cc @@ -7,7 +7,7 @@ void test( void ) { size_t degree = 4; - BPTree tree( degree ); + BPTree tree( degree ); std::random_device rd; std::mt19937 gen( rd() ); @@ -29,7 +29,7 @@ void test( void ) int last = 0; - tree.dfs_visit( [&]( Node* node ) { + tree.dfs_visit( [&]( Node* node ) { if ( node->is_leaf() ) { for ( const auto& key : node->get_keys() ) { if ( key <= last ) { @@ -42,7 +42,7 @@ void test( void ) } } ); - tree.dfs_visit( [&]( Node* node ) { + tree.dfs_visit( [&]( Node* node ) { if ( node->is_leaf() ) { if ( node->get_keys().size() > degree ) { fprintf( stderr, "Leaf node violates degree" ); diff --git a/src/tests/test-distributed.cc b/src/tests/test-distributed.cc index 7f0d6822..0e8a651f 100644 --- a/src/tests/test-distributed.cc +++ b/src/tests/test-distributed.cc @@ -92,7 +92,7 @@ void client() FakeRuntime rt {}; FakeRuntime rt_check {}; - NetworkWorker nw( rt ); + NetworkWorker nw( rt ); nw.start(); nw.connect( address ); @@ -129,7 +129,7 @@ void server( int client_pid ) FakeRuntime rt {}; FakeRuntime rt_check {}; - NetworkWorker nw( rt ); + NetworkWorker nw( rt ); nw.start(); nw.start_server( address ); diff --git a/src/tests/test-evaluator.cc b/src/tests/test-evaluator.cc index cd37563b..2e29e5e5 100644 --- a/src/tests/test-evaluator.cc +++ b/src/tests/test-evaluator.cc @@ -277,7 +277,7 @@ void test( void ) CHECK_EQ( fib10, Handle( 55_literal64 ) ); CHECK_EQ( mapreduce_called, 9 ); - BPTree bptree( 4 ); + BPTree bptree( 4 ); for ( int i = 0; i < 10; i++ ) { bptree.insert( i, to_string( i ) ); } diff --git a/src/tests/test-local-scheduler.cc b/src/tests/test-local-scheduler.cc index 160dd460..8468654f 100644 --- a/src/tests/test-local-scheduler.cc +++ b/src/tests/test-local-scheduler.cc @@ -149,7 +149,7 @@ void test_fib( void ) void test_bptree_get( void ) { - BPTree bptree( 4 ); + BPTree bptree( 4 ); for ( size_t i = 0; i < 4; i++ ) { bptree.insert( i, to_string( i ) ); } diff --git a/src/tests/test-pass-scheduler.cc b/src/tests/test-pass-scheduler.cc index 0f1c4931..535514f8 100644 --- a/src/tests/test-pass-scheduler.cc +++ b/src/tests/test-pass-scheduler.cc @@ -7,6 +7,7 @@ #include "scheduler.hh" #include "test.hh" +#include #include #include @@ -16,27 +17,27 @@ class FakeRuntime : public IRuntime { public: RuntimeStorage storage_ {}; - vector> todos_ {}; + absl::flat_hash_set> todos_ {}; uint32_t parallelism_ {}; optional get( Handle name ) override { - todos_.push_back( name ); + todos_.insert( name ); return {}; } optional get( Handle name ) override { - todos_.push_back( handle::upcast( name ) ); + todos_.insert( handle::upcast( name ) ); return {}; }; optional> get( Handle name ) override { - todos_.push_back( name ); + todos_.insert( name ); return {}; } optional get_shallow( Handle handle ) override { - storage_.ref( handle ).visit( [&]( auto h ) { todos_.push_back( h ); } ); + storage_.ref( handle ).visit( [&]( auto h ) { todos_.insert( h ); } ); return {}; } @@ -98,7 +99,7 @@ void case_one( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or fake_worker->todos_.front() != Handle( Handle( task ) ) ) { + or !fake_worker->todos_.contains( Handle( Handle( task ) ) ) ) { fprintf( stderr, "Todo size %zu", fake_worker->todos_.size() ); for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -155,7 +156,7 @@ void case_two( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or fake_worker->todos_.front() != Handle( Handle( task ) ) ) { + or !fake_worker->todos_.contains( Handle( Handle( task ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -187,10 +188,9 @@ void case_three( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or ( fake_worker->todos_.front() - != Handle( Handle( - Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) - and fake_worker->todos_.front() != Handle( handle.unwrap() ) ) ) { + or ( !fake_worker->todos_.contains( Handle( Handle( + Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) ) + and !fake_worker->todos_.contains( Handle( handle.unwrap() ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -222,10 +222,9 @@ void case_four( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or ( fake_worker->todos_.front() - != Handle( Handle( - Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) - and fake_worker->todos_.front() != Handle( handle.unwrap() ) ) ) { + or ( !fake_worker->todos_.contains( Handle( Handle( + Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) ) + and !fake_worker->todos_.contains( Handle( handle.unwrap() ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -263,10 +262,9 @@ void case_five( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or ( fake_worker->todos_.front() - != Handle( Handle( - Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) - and fake_worker->todos_.front() != Handle( handle.unwrap() ) ) ) { + or ( !fake_worker->todos_.contains( Handle( Handle( + Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) ) + and !fake_worker->todos_.contains( Handle( handle.unwrap() ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -308,10 +306,9 @@ void case_six( void ) rt->run( task ); if ( fake_worker->todos_.size() != 1 - or ( fake_worker->todos_.front() - != Handle( Handle( - Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) - and fake_worker->todos_.front() != Handle( handle.unwrap() ) ) ) { + or ( !fake_worker->todos_.contains( Handle( Handle( + Handle( Handle( Handle( handle.unwrap() ) ) ) ) ) ) + and !fake_worker->todos_.contains( Handle( handle.unwrap() ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -364,10 +361,8 @@ void case_seven( void ) Handle( Handle( Handle( Handle( Handle( handle::upcast( tree( *rt, limits( *rt, 1, 1, 10 ), Handle( Handle( handle ) ) ) ) ) ) ) ) ) ); - if ( ( ( fake_worker->todos_[0] == job0 or fake_worker->todos_[0] == job0_1 ) - and fake_worker->todos_[1] == job1 ) - or ( ( fake_worker->todos_[1] == job0 or fake_worker->todos_[1] == job0_1 ) - and fake_worker->todos_[0] == job1 ) ) { + if ( ( ( fake_worker->todos_.contains( job0 ) or fake_worker->todos_.contains( job0_1 ) ) + and fake_worker->todos_.contains( job1 ) ) ) { return; } } @@ -420,7 +415,7 @@ void case_eight( void ) .unwrap() ); if ( fake_worker->todos_.size() != 1 - or fake_worker->todos_.front() != Handle( Handle( remote_task ) ) ) { + or !fake_worker->todos_.contains( Handle( Handle( remote_task ) ) ) ) { cout << "fake_worker->todos_.size " << fake_worker->todos_.size() << endl; for ( const auto& todo : fake_worker->todos_ ) { cout << "Todo " << todo << endl; @@ -478,8 +473,7 @@ void case_nine( void ) auto job1 = Handle( Handle( Handle( Handle( Handle( handle2.unwrap() ) ) ) ) ); // Handle( Handle( Blob 1 ) ) ) ) - if ( ( fake_worker->todos_[0] == job0 and fake_worker->todos_[1] == job1 ) - or ( fake_worker->todos_[1] == job0 and fake_worker->todos_[0] == job1 ) ) { + if ( fake_worker->todos_.contains( job0 ) and fake_worker->todos_.contains( job1 ) ) { return; } } @@ -543,8 +537,7 @@ void case_ten( void ) auto job1 = Handle( Handle( Handle( Handle( Handle( handle2.unwrap() ) ) ) ) ); // Handle( Handle( Blob 1 ) ) ) ) - if ( ( fake_worker->todos_[0] == job0 and fake_worker->todos_[1] == job1 ) - or ( fake_worker->todos_[1] == job0 and fake_worker->todos_[0] == job1 ) ) { + if ( fake_worker->todos_.contains( job0 ) and fake_worker->todos_.contains( job1 ) ) { return; } } @@ -605,7 +598,7 @@ void case_eleven( void ) if ( fake_worker->todos_.size() == 1 ) { // get_shallow( Tree 0 ) auto job0 = Handle( ref ); - if ( fake_worker->todos_[0] == job0 ) { + if ( fake_worker->todos_.contains( job0 ) ) { return; } } @@ -669,8 +662,7 @@ void case_twelve( void ) // get( Tree, 255 ) auto job1 = Handle( Handle( Handle( Handle( tree( *rt, ref, Handle( (uint64_t)255 ) ).unwrap() ) ) ) ); - if ( ( fake_worker->todos_[0] == job0 and fake_worker->todos_[1] == job1 ) - or ( fake_worker->todos_[1] == job0 and fake_worker->todos_[0] == job1 ) ) { + if ( fake_worker->todos_.contains( job0 ) and fake_worker->todos_.contains( job1 ) ) { return; } } diff --git a/src/tests/test-python-flatware.cc b/src/tests/test-python-flatware.cc index 5db016ff..5b191fda 100644 --- a/src/tests/test-python-flatware.cc +++ b/src/tests/test-python-flatware.cc @@ -5,6 +5,7 @@ #include "test.hh" using namespace std; +namespace fs = std::filesystem; namespace tester { shared_ptr rt; @@ -20,12 +21,43 @@ Handle dirent( string_view name, string_view permissions, Handle conte return tester::Tree( tester::Blob( name ), tester::Blob( permissions ), content ); } -Handle fs() +static string d = "040000"; +static string f = "100644"; + +Handle create_from_path( fs::path path ) { - static string d = "040000"; - static string f = "100644"; - static auto e = dirent( ".", d, tester::Tree( dirent( "main.py", f, tester::Blob( "print(2 + 2)" ) ) ) ); - return e; + if ( fs::is_regular_file( path ) ) { + return dirent( path.filename().string(), f, tester::File( path ) ); + } else if ( fs::is_directory( path ) ) { + size_t count = distance( fs::directory_iterator( path ), {} ); + OwnedMutTree tree = OwnedMutTree::allocate( count ); + + size_t i = 0; + for ( auto& subpath : fs::directory_iterator( path ) ) { + tree[i] = create_from_path( subpath ); + i++; + } + + return dirent( path.filename().string(), + d, + handle::upcast( tester::rt->create( std::make_shared( std::move( tree ) ) ) ) ); + } else { + throw std::runtime_error( "Invalid file type" ); + } +} + +Handle filesys( string python_src_path ) +{ + fs::path python_src { python_src_path }; + size_t count = distance( fs::directory_iterator( python_src ), {} ); + OwnedMutTree tree = OwnedMutTree::allocate( count + 1 ); + size_t i = 0; + for ( auto& subpath : fs::directory_iterator( python_src ) ) { + tree[i] = create_from_path( subpath ); + i++; + } + tree[count] = dirent( "main.py", f, tester::Blob( "print(2 + 2)" ) ); + return dirent( ".", d, handle::upcast( tester::rt->create( std::make_shared( std::move( tree ) ) ) ) ); } void test( shared_ptr rt ) @@ -34,14 +66,30 @@ void test( shared_ptr rt ) auto python_exec = tester::Compile( tester::File( "applications-prefix/src/applications-build/flatware/examples/python/python-fixpoint.wasm" ) ); + auto dir = filesys( "applications-prefix/src/applications-build/flatware/examples/python/python-workingdir" ); auto input = flatware_input( *tester::rt, tester::Limits(), python_exec, - fs(), + dir, tester::Tree( tester::Blob( "python" ), tester::Blob( "main.py" ) ) ); auto result_handle = tester::rt->execute( input ); auto result_tree = tester::rt->get( result_handle.try_into().value() ).value(); + auto fixstdout = handle::extract( result_tree->at( 2 ) ).value(); + auto fixstderr = handle::extract( result_tree->at( 3 ) ).value(); + + fixstdout.visit( + overload { [&]( Handle n ) { cerr << "stdout: " << tester::rt->get( n ).value()->data() << endl; }, + [&]( Handle l ) { cerr << "stdout: " << l.data() << endl; } } + + ); + + fixstderr.visit( + overload { [&]( Handle n ) { cerr << "stderr: " << tester::rt->get( n ).value()->data() << endl; }, + [&]( Handle l ) { cerr << "stderr: " << l.data() << endl; } } + + ); + uint32_t x = -1; memcpy( &x, handle::extract( result_tree->at( 0 ) ).value().data(), sizeof( uint32_t ) ); if ( x != 0 ) {