diff --git a/tests/server.py b/tests/server.py index cd0a8c71..d3aa8d15 100644 --- a/tests/server.py +++ b/tests/server.py @@ -81,7 +81,7 @@ def is_running(self): """ return self.running.is_set() - def _wait_events(self, events: list[Event], ignored: list[Event], strict: bool): + def _wait_events(self, events: list[Event], strict: bool): while self.is_running(): msg = self.get_next() if msg is None: @@ -89,9 +89,6 @@ def _wait_events(self, events: list[Event], ignored: list[Event], strict: bool): continue print(f'Got event: {msg}') - if msg in ignored: - raise ValueError(f'Caught ignored event: {msg}') - if msg in events: events.remove(msg) if len(events) == 0: @@ -99,7 +96,7 @@ def _wait_events(self, events: list[Event], ignored: list[Event], strict: bool): elif strict: raise ValueError(f'Encountered unexpected event: {msg}') - def wait_events(self, events: list[Event], ignored: list[Event] = [], strict: bool = False): + def wait_events(self, events: list[Event], strict: bool = True): """ Continuously checks the server for incoming events until the specified events are found. @@ -107,11 +104,11 @@ def wait_events(self, events: list[Event], ignored: list[Event] = [], strict: bo Args: server: The server instance to retrieve events from. event (list[Event]): The events to search for. - ignored (list[Event]): List of events that should not happen. strict (bool): Fail if an unexpected event is detected. Raises: TimeoutError: If the required events are not found in 5 seconds. """ - fs = self.executor.submit(self._wait_events, events, ignored, strict) + print('Waiting for events:', *events, sep='\n') + fs = self.executor.submit(self._wait_events, events, strict) fs.result(timeout=5) diff --git a/tests/test_config_hotreload.py b/tests/test_config_hotreload.py index 4b5d337d..9a875228 100644 --- a/tests/test_config_hotreload.py +++ b/tests/test_config_hotreload.py @@ -100,7 +100,6 @@ def test_output_grpc_address_change(fact, fact_config, monitored_dir, server, al process = Process.from_proc() e = Event(process=process, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') server.wait_events([e]) @@ -114,7 +113,6 @@ def test_output_grpc_address_change(fact, fact_config, monitored_dir, server, al e = Event(process=process, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Waiting for event on alternate server: {e}') alternate_server.wait_events([e]) @@ -127,10 +125,6 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test fut = os.path.join(monitored_dir, 'test2.txt') with open(fut, 'w') as f: @@ -138,9 +132,8 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) config, config_file = fact_config config['paths'] = [ignored_dir] @@ -153,17 +146,12 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.OPEN, file=ignored_file, host_path='') - print(f'Waiting for event: {e}') # File Under Test with open(fut, 'w') as f: f.write('This is another ignored event') - ignored_event = Event( - process=p, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Ignoring: {ignored_event}') - - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): @@ -174,10 +162,6 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test fut = os.path.join(monitored_dir, 'test2.txt') with open(fut, 'w') as f: @@ -185,9 +169,8 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): e = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) config, config_file = fact_config config['paths'] = [monitored_dir, ignored_dir] @@ -205,6 +188,5 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): file=ignored_file, host_path=''), Event(process=p, event_type=EventType.OPEN, file=fut, host_path='') ] - print(f'Waiting for events: {events}') server.wait_events(events) diff --git a/tests/test_editors/test_nvim.py b/tests/test_editors/test_nvim.py index b61bdf1c..32175d42 100644 --- a/tests/test_editors/test_nvim.py +++ b/tests/test_editors/test_nvim.py @@ -20,9 +20,6 @@ def test_new_file(editor_container, server): file=fut, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -67,9 +64,6 @@ def test_open_file(editor_container, server): file=f'{fut}~', host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -92,9 +86,6 @@ def test_new_file_ovfs(editor_container, server): file=fut, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -145,7 +136,4 @@ def test_open_file_ovfs(editor_container, server): file=f'{fut}~', host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_sed.py b/tests/test_editors/test_sed.py index ed979f66..99accb88 100644 --- a/tests/test_editors/test_sed.py +++ b/tests/test_editors/test_sed.py @@ -34,9 +34,6 @@ def test_sed(vi_container, server): file=sed_tmp_file, host_path='', owner_uid=0, owner_gid=0), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -76,7 +73,4 @@ def test_sed_ovfs(vi_container, server): file=sed_tmp_file, host_path='', owner_uid=0, owner_gid=0), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_vi.py b/tests/test_editors/test_vi.py index e2e08f75..65c4ef06 100644 --- a/tests/test_editors/test_vi.py +++ b/tests/test_editors/test_vi.py @@ -35,9 +35,6 @@ def test_new_file(vi_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -82,9 +79,6 @@ def test_new_file_ovfs(vi_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -145,9 +139,6 @@ def test_open_file(vi_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -220,7 +211,4 @@ def test_open_file_ovfs(vi_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) diff --git a/tests/test_editors/test_vim.py b/tests/test_editors/test_vim.py index b3778c91..8919362b 100644 --- a/tests/test_editors/test_vim.py +++ b/tests/test_editors/test_vim.py @@ -33,9 +33,6 @@ def test_new_file(editor_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -78,9 +75,6 @@ def test_new_file_ovfs(editor_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -140,9 +134,6 @@ def test_open_file(editor_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) @@ -214,7 +205,4 @@ def test_open_file_ovfs(editor_container, server): file=swap_file, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events, strict=True) diff --git a/tests/test_file_open.py b/tests/test_file_open.py index 001aa4a3..07f7c227 100644 --- a/tests/test_file_open.py +++ b/tests/test_file_open.py @@ -1,7 +1,6 @@ import multiprocessing as mp import os -import docker import pytest from conftest import join_path_with_filename, path_to_string @@ -16,13 +15,12 @@ pytest.param('🚀rocket.txt', id='Emoji'), pytest.param(b'test\xff\xfe.txt', id='Invalid'), ]) -def test_open(fact, monitored_dir, server, filename): +def test_open(monitored_dir, server, filename): """ Tests the opening of a file and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. filename: Name of the file to create (includes UTF-8 test cases). @@ -38,18 +36,16 @@ def test_open(fact, monitored_dir, server, filename): e = Event(process=Process.from_proc(), event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {e}') server.wait_events([e]) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests the opening of multiple files and verifies that the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. filenames: List of filenames to create (includes UTF-8 test cases). @@ -62,21 +58,18 @@ def test_multiple(fact, monitored_dir, server): with open(fut, 'w') as f: f.write('This is a test') - e = Event(process=process, event_type=EventType.CREATION, - file=fut, host_path='') - print(f'Waiting for event: {e}') - events.append(e) + events.append( + Event(process=process, event_type=EventType.CREATION, file=fut, host_path='')) server.wait_events(events) -def test_multiple_access(fact, test_file, server): +def test_multiple_access(test_file, server): """ Tests multiple opening of a file and verifies that the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -85,21 +78,18 @@ def test_multiple_access(fact, test_file, server): with open(test_file, 'a+') as f: f.write('This is a test') - e = Event(process=Process.from_proc(), file=test_file, - host_path=test_file, event_type=EventType.OPEN) - print(f'Waiting for event: {e}') - events.append(e) + events.append(Event(process=Process.from_proc(), file=test_file, + host_path=test_file, event_type=EventType.OPEN)) server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that open events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_file: Temporary file for testing. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -111,19 +101,14 @@ def test_ignored(fact, test_file, ignored_dir, server): with open(ignored_file, 'w') as f: f.write('This is to be ignored') - ignored_event = Event(process=p, event_type=EventType.CREATION, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test with open(test_file, 'w') as f: f.write('This is a test') e = Event(process=p, event_type=EventType.OPEN, file=test_file, host_path=test_file) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, stop_event: mp.Event): @@ -136,13 +121,12 @@ def do_test(fut: str, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests the opening of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -156,10 +140,8 @@ def test_external_process(fact, monitored_dir, server): creation = Event(process=p, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {creation}') write_access = Event( process=p, event_type=EventType.OPEN, file=fut, host_path='') - print(f'Waiting for event: {write_access}') try: server.wait_events([creation, write_access]) @@ -168,21 +150,19 @@ def test_external_process(fact, monitored_dir, server): proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): # File Under Test fut = '/container-dir/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) events = [ Event(process=process, event_type=EventType.CREATION, file=fut, host_path=''), @@ -190,51 +170,42 @@ def test_overlay(fact, test_container, server): file=fut, host_path='') ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): # File Under Test fut = '/mounted/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.CREATION, file=fut, host_path='') - print(f'Waiting for event: {event}') server.wait_events([event]) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): # File Under Test fut = '/unmonitored/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'touch {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.OPEN, file=fut, host_path=test_file) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_misc.py b/tests/test_misc.py index 71c662dd..a18814a4 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -40,7 +40,7 @@ def run_self_deleter(fact, monitored_dir, logs_dir, docker_client, build_self_de container.remove() -def test_d_path_sanitization(fact, monitored_dir, server, run_self_deleter, docker_client): +def test_d_path_sanitization(monitored_dir, server, run_self_deleter, docker_client): """ Ensure the sanitization of paths obtained by calling the bpf_d_path helper don't include the " (deleted)" suffix when the file is @@ -52,16 +52,13 @@ def test_d_path_sanitization(fact, monitored_dir, server, run_self_deleter, dock container = run_self_deleter - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/local/bin/self-deleter', - args=f'self-deleter {fut}', - name='self-deleter', - container_id=container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/local/bin/self-deleter', + args=f'self-deleter {fut}', + name='self-deleter', + container_id=container.id[:12], + ) event = Event(process=process, event_type=EventType.OPEN, file=fut, host_path=host_path) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_path_chmod.py b/tests/test_path_chmod.py index b62391cc..0573741a 100644 --- a/tests/test_path_chmod.py +++ b/tests/test_path_chmod.py @@ -15,13 +15,12 @@ '🔒secure.txt', b'perm\xff\xfe.txt', ]) -def test_chmod(fact, monitored_dir, server, filename): +def test_chmod(monitored_dir, server, filename): """ Tests changing permissions on a file and verifies the corresponding event is captured by the server Args: - fact: Fixture for file activity (only required to be runing). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. filename: Name of the file to create (includes UTF-8 test cases). @@ -47,18 +46,14 @@ def test_chmod(fact, monitored_dir, server, filename): file=fut, host_path='', mode=mode), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests modifying permissions on multiple files. Args: - fact: Fixture for file activity (only required to be runing). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -82,12 +77,11 @@ def test_multiple(fact, monitored_dir, server): server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that permission events on ignored files are not captured. Args: - fact: Fixture for file activity (only required to be running). test_file: File monitored on the host, mounted to the container. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -101,18 +95,13 @@ def test_ignored(fact, test_file, ignored_dir, server): f.write('This is to be ignored') os.chmod(ignored_file, mode) - ignored_event = Event(process=process, event_type=EventType.PERMISSION, - file=ignored_file, host_path='', mode=mode) - print(f'Ignoring: {ignored_event}') - # File Under Test os.chmod(test_file, mode) e = Event(process=process, event_type=EventType.PERMISSION, file=test_file, host_path=test_file, mode=mode) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, mode: int, stop_event: mp.Event): @@ -124,13 +113,12 @@ def do_test(fut: str, mode: int, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests permission change of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -142,23 +130,25 @@ def test_external_process(fact, monitored_dir, server): proc.start() process = Process.from_proc(proc.pid) - event = Event(process=process, event_type=EventType.PERMISSION, - file=fut, host_path='', mode=mode) - print(f'Waiting for event: {event}') + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path='', mode=mode), + Event(process=process, event_type=EventType.PERMISSION, + file=fut, host_path='', mode=mode), + ] try: - server.wait_events([event]) + server.wait_events(events) finally: stop_event.set() proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): """ Test permission changes on an overlayfs file (inside a container) Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ @@ -170,23 +160,18 @@ def test_overlay(fact, test_container, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'chmod {mode} {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chmod = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + chmod = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -196,18 +181,14 @@ def test_overlay(fact, test_container, server): file=fut, host_path='', mode=int(mode, 8)), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): """ Test permission changes on a file bind mounted into a container Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. ignored_dir: This directory is ignored on the host, and mounted to the container. server: The server instance to communicate with. @@ -220,23 +201,18 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'chmod {mode} {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chmod = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + chmod = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -244,19 +220,15 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): host_path='', mode=int(mode, 8)), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): """ Test permission changes on a file bind mounted to a container and monitored on the host. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. test_file: File monitored on the host, mounted to the container. server: The server instance to communicate with. @@ -270,16 +242,13 @@ def test_unmonitored_mounted_dir(fact, test_container, test_file, server): # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'chmod {mode} {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chmod', - args=f'chmod {mode} {fut}', - name='chmod', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/chmod', + args=f'chmod {mode} {fut}', + name='chmod', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.PERMISSION, file=fut, host_path=test_file, mode=int(mode, 8)) - print(f'Waiting for event: {event}') server.wait_events([event]) diff --git a/tests/test_path_chown.py b/tests/test_path_chown.py index 9e4daf12..189a2af2 100644 --- a/tests/test_path_chown.py +++ b/tests/test_path_chown.py @@ -22,13 +22,12 @@ '👤owner.txt', b'own\xff\xfe.txt', ]) -def test_chown(fact, test_container, server, filename): +def test_chown(test_container, server, filename): """ Execute a chown operation on a file and verifies the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. filename: Name of the file to create (includes UTF-8 test cases). @@ -47,48 +46,40 @@ def test_chown(fact, test_container, server, filename): touch_cmd = f'touch {fut}' chown_cmd = f'chown {TEST_UID}:{TEST_GID} {fut}' - loginuid = pow(2, 32) - 1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), Event(process=chown, event_type=EventType.OWNERSHIP, file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_multiple(fact, test_container, server): +def test_multiple(test_container, server): """ Tests ownership operations on multiple files and verifies the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ events = [] - loginuid = pow(2, 32) - 1 # File Under Test for i in range(3): @@ -98,48 +89,40 @@ def test_multiple(fact, test_container, server): test_container.exec_run(touch_cmd) test_container.exec_run(chown_cmd) - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) events.extend([ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), Event(process=chown, event_type=EventType.OWNERSHIP, file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ]) - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_ignored(fact, test_container, server): +def test_ignored(test_container, server): """ Tests that ownership events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ - loginuid = pow(2, 32) - 1 - ignored_file = '/test.txt' monitored_file = '/container-dir/test.txt' @@ -153,78 +136,35 @@ def test_ignored(fact, test_container, server): test_container.exec_run(monitored_touch_cmd) test_container.exec_run(monitored_chown_cmd) - ignored_touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=ignored_touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - ignored_chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=ignored_chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - reported_touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=monitored_touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - reported_chown = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=monitored_chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - ignored_events = [ - Event(process=ignored_touch, - event_type=EventType.CREATION, - file=ignored_file, - host_path=''), - Event(process=ignored_chown, - event_type=EventType.OWNERSHIP, - file=ignored_file, - host_path='', - owner_uid=TEST_UID, - owner_gid=TEST_GID), - ] - expected_events = [ - Event(process=reported_touch, - event_type=EventType.CREATION, - file=monitored_file, - host_path=''), - Event(process=reported_chown, - event_type=EventType.OWNERSHIP, - file=monitored_file, - host_path='', - owner_uid=TEST_UID, - owner_gid=TEST_GID), + reported_touch = Process.in_container( + exe_path='/usr/bin/touch', + args=monitored_touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + reported_chown = Process.in_container( + exe_path='/usr/bin/chown', + args=monitored_chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) + events = [ + Event(process=reported_touch, event_type=EventType.CREATION, + file=monitored_file, host_path=''), + Event(process=reported_touch, event_type=EventType.OPEN, + file=monitored_file, host_path=''), + Event(process=reported_chown, event_type=EventType.OWNERSHIP, + file=monitored_file, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID), ] - for e in ignored_events: - print(f'Events that should be ignored: {e}') - - for e in expected_events: - print(f'Waiting for event: {e}') + server.wait_events(events=events) - server.wait_events(events=expected_events, ignored=ignored_events) - -def test_no_change(fact, test_container, server): +def test_no_change(test_container, server): """ Tests that chown to the same UID/GID triggers events for all calls. Args: - fact: Fixture for file activity (only required to be running). test_container: A container for running commands in. server: The server instance to communicate with. """ @@ -243,44 +183,29 @@ def test_no_change(fact, test_container, server): # Second chown to the same UID/GID - this should ALSO trigger an event test_container.exec_run(chown_cmd) - loginuid = pow(2, 32) - 1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=touch_cmd, - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - chown = [ - Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid), - Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/chown', - args=chown_cmd, - name='chown', - container_id=test_container.id[:12], - loginuid=loginuid) - ] - + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=touch_cmd, + name='touch', + container_id=test_container.id[:12], + ) + chown = Process.in_container( + exe_path='/usr/bin/chown', + args=chown_cmd, + name='chown', + container_id=test_container.id[:12], + ) + chown_event = Event(process=chown, event_type=EventType.OWNERSHIP, + file=fut, host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID) + # Expect both chown events (all calls to chown trigger events) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), - *(Event(process=p, event_type=EventType.OWNERSHIP, file=fut, - host_path='', owner_uid=TEST_UID, owner_gid=TEST_GID) for p in chown), + Event(process=touch, event_type=EventType.OPEN, file=fut, + host_path=''), + chown_event, + chown_event, ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) - diff --git a/tests/test_path_unlink.py b/tests/test_path_unlink.py index beafed0c..36313659 100644 --- a/tests/test_path_unlink.py +++ b/tests/test_path_unlink.py @@ -16,13 +16,12 @@ '🗑️delete.txt', b'rm\xff\xfe.txt', ]) -def test_remove(fact, monitored_dir, server, filename): +def test_remove(monitored_dir, server, filename): """ Tests the removal of a file and verifies the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. filename: Name of the file to create and remove (includes UTF-8 test cases). @@ -53,13 +52,12 @@ def test_remove(fact, monitored_dir, server, filename): server.wait_events(events) -def test_multiple(fact, monitored_dir, server): +def test_multiple(monitored_dir, server): """ Tests the removal of multiple files and verifies the corresponding events are captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for monitoring the test file. server: The server instance to communicate with. """ @@ -83,13 +81,12 @@ def test_multiple(fact, monitored_dir, server): server.wait_events(events) -def test_ignored(fact, test_file, ignored_dir, server): +def test_ignored(test_file, ignored_dir, server): """ Tests that unlink events on ignored files are not captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. ignored_dir: Temporary directory path that is not monitored by fact. server: The server instance to communicate with. @@ -102,18 +99,13 @@ def test_ignored(fact, test_file, ignored_dir, server): f.write('This is to be ignored') os.remove(ignored_file) - ignored_event = Event(process=process, event_type=EventType.UNLINK, - file=ignored_file, host_path='') - print(f'Ignoring: {ignored_event}') - # File Under Test os.remove(test_file) e = Event(process=process, event_type=EventType.UNLINK, file=test_file, host_path=test_file) - print(f'Waiting for event: {e}') - server.wait_events([e], ignored=[ignored_event]) + server.wait_events([e]) def do_test(fut: str, stop_event: mp.Event): @@ -125,13 +117,12 @@ def do_test(fut: str, stop_event: mp.Event): stop_event.wait() -def test_external_process(fact, monitored_dir, server): +def test_external_process(monitored_dir, server): """ Tests the removal of a file by an external process and verifies that the corresponding event is captured by the server. Args: - fact: Fixture for file activity (only required to be running). monitored_dir: Temporary directory path for creating the test file. server: The server instance to communicate with. """ @@ -142,18 +133,21 @@ def test_external_process(fact, monitored_dir, server): proc.start() process = Process.from_proc(proc.pid) - removal = Event(process=process, event_type=EventType.UNLINK, - file=fut, host_path='') - print(f'Waiting for event: {removal}') + events = [ + Event(process=process, event_type=EventType.CREATION, + file=fut, host_path=''), + Event(process=process, event_type=EventType.UNLINK, + file=fut, host_path=''), + ] try: - server.wait_events([removal]) + server.wait_events(events) finally: stop_event.set() proc.join(1) -def test_overlay(fact, test_container, server): +def test_overlay(test_container, server): # File Under Test fut = '/container-dir/test.txt' @@ -161,23 +155,18 @@ def test_overlay(fact, test_container, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'rm {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - rm = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + rm = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -187,13 +176,10 @@ def test_overlay(fact, test_container, server): file=fut, host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_mounted_dir(fact, test_container, ignored_dir, server): +def test_mounted_dir(test_container, ignored_dir, server): # File Under Test fut = '/mounted/test.txt' @@ -201,23 +187,18 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): test_container.exec_run(f'touch {fut}') test_container.exec_run(f'rm {fut}') - loginuid = pow(2, 32)-1 - touch = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/touch', - args=f'touch {fut}', - name='touch', - container_id=test_container.id[:12], - loginuid=loginuid) - rm = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=loginuid) + touch = Process.in_container( + exe_path='/usr/bin/touch', + args=f'touch {fut}', + name='touch', + container_id=test_container.id[:12], + ) + rm = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) events = [ Event(process=touch, event_type=EventType.CREATION, file=fut, host_path=''), @@ -225,29 +206,23 @@ def test_mounted_dir(fact, test_container, ignored_dir, server): host_path=''), ] - for e in events: - print(f'Waiting for event: {e}') - server.wait_events(events) -def test_unmonitored_mounted_dir(fact, test_container, test_file, server): +def test_unmonitored_mounted_dir(test_container, test_file, server): # File Under Test fut = '/unmonitored/test.txt' # Create the exec and an equivalent event that it will trigger test_container.exec_run(f'rm {fut}') - process = Process(pid=None, - uid=0, - gid=0, - exe_path='/usr/bin/rm', - args=f'rm {fut}', - name='rm', - container_id=test_container.id[:12], - loginuid=pow(2, 32)-1) + process = Process.in_container( + exe_path='/usr/bin/rm', + args=f'rm {fut}', + name='rm', + container_id=test_container.id[:12], + ) event = Event(process=process, event_type=EventType.UNLINK, file=fut, host_path=test_file) - print(f'Waiting for event: {event}') server.wait_events([event])