From 48f7c8b312288450c2ed02a7bf0dff2f5c834095 Mon Sep 17 00:00:00 2001 From: Gavin Mak Date: Fri, 13 Jun 2025 17:53:38 -0700 Subject: [PATCH] sync: Implement --interleaved sync worker For each assigned project, the worker sequentially calls Sync_NetworkHalf and Sync_LocalHalf, respecting --local-only and --network-only flags. To prevent scrambled progress bars, all stderr output from the checkout phase is captured (shown with --verbose). Result objects now carry status and timing information from the worker for state updates. Bug: 421935613 Change-Id: I398602e08a375e974a8914e5fa48ffae673dda9b --- progress.py | 15 ++ subcmds/sync.py | 436 ++++++++++++++++++++++++++++++++----- tests/test_subcmds_sync.py | 181 ++++++++++++++- 3 files changed, 579 insertions(+), 53 deletions(-) diff --git a/progress.py b/progress.py index fe246c744..a386f426d 100644 --- a/progress.py +++ b/progress.py @@ -195,6 +195,21 @@ class Progress: ) ) + def display_message(self, msg): + """Clears the current progress line and prints a message above it. + + The progress bar is then redrawn on the next line. + """ + if not _TTY or IsTraceToStderr() or self._quiet: + return + + # Erase the current line, print the message with a newline, + # and then immediately redraw the progress bar on the new line. + sys.stderr.write("\r" + CSI_ERASE_LINE) + sys.stderr.write(msg + "\n") + sys.stderr.flush() + self.update(inc=0) + def end(self): self._update_event.set() if not _TTY or IsTraceToStderr() or self._quiet: diff --git a/subcmds/sync.py b/subcmds/sync.py index f5eaf32dd..ba6fcfe05 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import contextlib import functools import http.cookiejar as cookielib import io @@ -198,6 +199,29 @@ class _SyncResult(NamedTuple): """Individual project sync result for interleaved mode. Attributes: +<<<<<<< PATCH SET (ca698b sync: Implement --interleaved sync worker) + project_index: The index of the project in the shared list. + relpath: The relative path of the project from the top of the repo client. + fetch_success: True if the fetch operation was successful. + remote_fetched: True if the remote was actually queried. + fetch_error: The Exception object if the fetch failed, else None. + fetch_start: A time.time() timestamp of when the fetch started. + fetch_finish: A time.time() timestamp of when the fetch finished. + checkout_success: True if the checkout operation was successful. + checkout_error: The Exception object if the checkout failed, else None. + checkout_start: A time.time() timestamp of when the checkout started. + checkout_finish: A time.time() timestamp of when the checkout +||||||| BASE + relpath: The relative path of the project from the top of the repo client. + fetch_success: True if the fetch operation was successful. + checkout_success: True if the checkout operation was successful. + fetch_error: The Exception object if the fetch failed, else None. + checkout_error: The Exception object if the checkout failed, else None. + fetch_start: A time.time() timestamp of when the fetch started. + fetch_finish: A time.time() timestamp of when the fetch finished. + checkout_start: A time.time() timestamp of when the checkout started. + checkout_finish: A time.time() timestamp of when the checkout +======= relpath (str): The project's relative path from the repo client top. fetch_success (bool): True if the fetch operation was successful. checkout_success (bool): True if the checkout operation was @@ -211,20 +235,27 @@ class _SyncResult(NamedTuple): checkout_start (Optional[float]): The time.time() when checkout started. checkout_finish (Optional[float]): The time.time() when checkout +>>>>>>> BASE (205433 sync: Add orchestration logic for --interleaved) finished. + stderr_text: The combined output from both fetch and checkout operations. """ + project_index: int relpath: str - fetch_success: bool - checkout_success: bool - fetch_error: Optional[Exception] - checkout_error: Optional[Exception] + fetch_success: bool + remote_fetched: bool + fetch_error: Optional[Exception] fetch_start: Optional[float] fetch_finish: Optional[float] + + checkout_success: bool + checkout_error: Optional[Exception] checkout_start: Optional[float] checkout_finish: Optional[float] + stderr_text: str + class _InterleavedSyncResult(NamedTuple): """Result of an interleaved sync. @@ -996,6 +1027,7 @@ later is required to fix a server side protocol bug. err_event.set() # Call self update, unless requested not to + # TODO(b/42193561): Extract repo update logic to ExecuteHelper. if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0": _PostRepoFetch(rp, opt.repo_verify) if opt.network_only: @@ -1176,6 +1208,16 @@ later is required to fix a server side protocol bug. self._local_sync_state.Save() return proc_res and not err_results + def _PrintManifestNotices(self, opt): + """Print all manifest notices, but only once.""" + printed_notices = set() + # Print all manifest notices, but only once. + # Sort by path_prefix to ensure consistent ordering. + for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix): + if m.notice and m.notice not in printed_notices: + print(m.notice) + printed_notices.add(m.notice) + @staticmethod def _GetPreciousObjectsState(project: Project, opt): """Get the preciousObjects state for the project. @@ -2032,14 +2074,7 @@ later is required to fix a server side protocol bug. if err_checkout: err_event.set() - printed_notices = set() - # If there's a notice that's supposed to print at the end of the sync, - # print it now... But avoid printing duplicate messages, and preserve - # order. - for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix): - if m.notice and m.notice not in printed_notices: - print(m.notice) - printed_notices.add(m.notice) + self._PrintManifestNotices(opt) # If we saw an error, exit with code 1 so that other scripts can check. if err_event.is_set(): @@ -2068,7 +2103,151 @@ later is required to fix a server side protocol bug. raise SyncError(aggregate_errors=errors) @classmethod +<<<<<<< PATCH SET (ca698b sync: Implement --interleaved sync worker) + def _SyncOneProject(cls, opt, project_index, project) -> _SyncResult: + """Syncs a single project for interleaved sync.""" + fetch_success = False + remote_fetched = False + fetch_error = None + fetch_start = None + fetch_finish = None + network_output = "" + + if opt.local_only: + fetch_success = True + else: + fetch_start = time.time() + network_output_capture = io.StringIO() + try: + ssh_proxy = cls.get_parallel_context().get("ssh_proxy") + sync_result = project.Sync_NetworkHalf( + quiet=opt.quiet, + verbose=opt.verbose, + output_redir=network_output_capture, + current_branch_only=cls._GetCurrentBranchOnly( + opt, project.manifest + ), + force_sync=opt.force_sync, + clone_bundle=opt.clone_bundle, + tags=opt.tags, + archive=project.manifest.IsArchive, + optimized_fetch=opt.optimized_fetch, + retry_fetches=opt.retry_fetches, + prune=opt.prune, + ssh_proxy=ssh_proxy, + clone_filter=project.manifest.CloneFilter, + partial_clone_exclude=project.manifest.PartialCloneExclude, + clone_filter_for_depth=project.manifest.CloneFilterForDepth, + ) + fetch_success = sync_result.success + remote_fetched = sync_result.remote_fetched + if sync_result.error: + fetch_error = sync_result.error + except KeyboardInterrupt: + logger.error( + "Keyboard interrupt while processing %s", project.name + ) + except GitError as e: + fetch_error = e + logger.error("error.GitError: Cannot fetch %s", e) + except Exception as e: + fetch_error = e + logger.error( + "error: Cannot fetch %s (%s: %s)", + project.name, + type(e).__name__, + e, + ) + finally: + fetch_finish = time.time() + network_output = network_output_capture.getvalue() + + checkout_success = False + checkout_error = None + checkout_start = None + checkout_finish = None + checkout_stderr = "" + + if fetch_success and not opt.network_only: + checkout_start = time.time() + stderr_capture = io.StringIO() + try: + with contextlib.redirect_stderr(stderr_capture): + syncbuf = SyncBuffer( + project.manifest.manifestProject.config, + detach_head=opt.detach_head, + ) + local_half_errors = [] + project.Sync_LocalHalf( + syncbuf, + force_sync=opt.force_sync, + force_checkout=opt.force_checkout, + force_rebase=opt.rebase, + errors=local_half_errors, + verbose=opt.verbose, + ) + checkout_success = syncbuf.Finish() + if local_half_errors: + checkout_error = SyncError( + aggregate_errors=local_half_errors + ) + except KeyboardInterrupt: + logger.error( + "Keyboard interrupt while processing %s", project.name + ) + except GitError as e: + checkout_error = e + logger.error( + "error.GitError: Cannot checkout %s: %s", project.name, e + ) + except Exception as e: + checkout_error = e + logger.error( + "error: Cannot checkout %s: %s: %s", + project.name, + type(e).__name__, + e, + ) + finally: + checkout_finish = time.time() + checkout_stderr = stderr_capture.getvalue() + elif fetch_success: + checkout_success = True + + # Consolidate all captured output. + captured_parts = [] + if network_output: + captured_parts.append(network_output) + if checkout_stderr: + captured_parts.append(checkout_stderr) + stderr_text = "\n".join(captured_parts) + + return _SyncResult( + project_index=project_index, + relpath=project.relpath, + fetch_success=fetch_success, + remote_fetched=remote_fetched, + checkout_success=checkout_success, + fetch_error=fetch_error, + checkout_error=checkout_error, + stderr_text=stderr_text.strip(), + fetch_start=fetch_start, + fetch_finish=fetch_finish, + checkout_start=checkout_start, + checkout_finish=checkout_finish, + ) + + @classmethod + def _InterleavedSyncWorker( + cls, opt, project_indices + ) -> _InterleavedSyncResult: +||||||| BASE + def _InterleavedSyncWorker( + cls, opt, project_indices + ) -> _InterleavedSyncResult: +======= def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: +>>>>>>> BASE (205433 sync: Add orchestration logic for --interleaved) """Worker for interleaved sync. This function is responsible for syncing a group of projects that share @@ -2092,27 +2271,12 @@ later is required to fix a server side protocol bug. # Use the first project as the representative for the progress bar. first_project = projects[project_indices[0]] key = f"{first_project.name} @ {first_project.relpath}" - start_time = time.time() - sync_dict[key] = start_time + sync_dict[key] = time.time() try: for idx in project_indices: project = projects[idx] - # For now, simulate a successful sync. - # TODO(b/421935613): Perform the actual git fetch and checkout. - results.append( - _SyncResult( - relpath=project.relpath, - fetch_success=True, - checkout_success=True, - fetch_error=None, - checkout_error=None, - fetch_start=None, - fetch_finish=None, - checkout_start=None, - checkout_finish=None, - ) - ) + results.append(cls._SyncOneProject(opt, idx, project)) finally: del sync_dict[key] @@ -2130,9 +2294,39 @@ later is required to fix a server side protocol bug. ): """Callback to process results from interleaved sync workers.""" ret = True + projects = self.get_parallel_context()["projects"] for result_group in results_sets: for result in result_group.results: pm.update() + project = projects[result.project_index] + + if opt.verbose and result.stderr_text: + pm.display_message(result.stderr_text) + + if result.fetch_start: + self._fetch_times.Set( + project, + result.fetch_finish - result.fetch_start, + ) + self._local_sync_state.SetFetchTime(project) + self.event_log.AddSync( + project, + event_log.TASK_SYNC_NETWORK, + result.fetch_start, + result.fetch_finish, + result.fetch_success, + ) + if result.checkout_start: + if result.checkout_success: + self._local_sync_state.SetCheckoutTime(project) + self.event_log.AddSync( + project, + event_log.TASK_SYNC_LOCAL, + result.checkout_start, + result.checkout_finish, + result.checkout_success, + ) + if result.fetch_success and result.checkout_success: synced_relpaths.add(result.relpath) else: @@ -2187,25 +2381,81 @@ later is required to fix a server side protocol bug. sync_event = _threading.Event() sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) +<<<<<<< PATCH SET (ca698b sync: Implement --interleaved sync worker) + with multiprocessing.Manager() as manager, ssh.ProxyManager( + manager + ) as ssh_proxy: + ssh_proxy.sock() + with self.ParallelContext(): + self.get_parallel_context()["ssh_proxy"] = ssh_proxy + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() +||||||| BASE + with self.ParallelContext(): + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() +======= with self.ParallelContext(): # TODO(gavinmak): Use multprocessing.Queue instead of dict. self.get_parallel_context()[ "sync_dict" ] = multiprocessing.Manager().dict() sync_progress_thread.start() +>>>>>>> BASE (205433 sync: Add orchestration logic for --interleaved) - try: - # Outer loop for dynamic project discovery (e.g., submodules). - # It continues until no unsynced projects remain. - while True: - projects_to_sync = [ - p - for p in project_list - if p.relpath not in synced_relpaths - ] - if not projects_to_sync: + try: + # Outer loop for dynamic project discovery. This continues + # until no unsynced projects remain. + while True: + projects_to_sync = [ + p + for p in project_list + if p.relpath not in synced_relpaths + ] + if not projects_to_sync: + break + +<<<<<<< PATCH SET (ca698b sync: Implement --interleaved sync worker) + pending_relpaths = {p.relpath for p in projects_to_sync} + if previously_pending_relpaths == pending_relpaths: + logger.warning( + "Stall detected in interleaved sync, not all " + "projects could be synced." +||||||| BASE + pending_relpaths = {p.relpath for p in projects_to_sync} + if previously_pending_relpaths == pending_relpaths: + logger.warning( + "Stall detected in interleaved sync, not all " + "projects could be synced." + ) + err_event.set() break + previously_pending_relpaths = pending_relpaths + # Update the projects list for workers in the current pass. + self.get_parallel_context()["projects"] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } + + # Inner loop to process projects in a hierarchical order. + # This iterates through levels of project dependencies (e.g. + # 'foo' then 'foo/bar'). All projects in one level can be + # processed in parallel, but we must wait for a level to + # complete before starting the next. + for level_projects in _SafeCheckoutOrder(projects_to_sync): + if not level_projects: + continue + + objdir_project_map = collections.defaultdict(list) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] +======= pending_relpaths = {p.relpath for p in projects_to_sync} if previously_pending_relpaths == pending_relpaths: logger.error( @@ -2235,7 +2485,32 @@ later is required to fix a server side protocol bug. for p in level_projects: objdir_project_map[p.objdir].append( project_index_map[p] +>>>>>>> BASE (205433 sync: Add orchestration logic for --interleaved) ) +<<<<<<< PATCH SET (ca698b sync: Implement --interleaved sync worker) +||||||| BASE + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + synced_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial(self._InterleavedSyncWorker, opt), + work_items, + callback=callback, + output=pm, + chunksize=1, + ): +======= work_items = list(objdir_project_map.values()) if not work_items: @@ -2257,26 +2532,83 @@ later is required to fix a server side protocol bug. output=pm, chunksize=1, ): +>>>>>>> BASE (205433 sync: Add orchestration logic for --interleaved) err_event.set() + break + previously_pending_relpaths = pending_relpaths - if err_event.is_set() and opt.fail_fast: - raise SyncFailFastError(aggregate_errors=errors) + self.get_parallel_context()[ + "projects" + ] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } - self._ReloadManifest(None, manifest) - project_list = self.GetProjects( - args, - missing_ok=True, - submodules_ok=opt.fetch_submodules, - manifest=manifest, - all_manifests=not opt.this_manifest_only, - ) - finally: - sync_event.set() - sync_progress_thread.join() + # Inner loop to process projects in a hierarchical + # order. This iterates through levels of project + # dependencies (e.g. 'foo' then 'foo/bar'). All projects + # in one level can be processed in parallel, but we must + # wait for a level to complete before starting the next. + for level_projects in _SafeCheckoutOrder( + projects_to_sync + ): + if not level_projects: + continue + + objdir_project_map = collections.defaultdict(list) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] + ) + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + synced_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial( + self._InterleavedSyncWorker, opt + ), + work_items, + callback=callback, + output=pm, + chunksize=1, + ): + err_event.set() + + if err_event.is_set() and opt.fail_fast: + raise SyncFailFastError(aggregate_errors=errors) + + self._ReloadManifest(None, manifest) + project_list = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + finally: + sync_event.set() + sync_progress_thread.join() pm.end() + # TODO(b/421935613): Add the manifest loop block from PhasedSync. + if not self.outer_client.manifest.IsArchive: + self._GCProjects(project_list, opt, err_event) + + self._PrintManifestNotices(opt) if err_event.is_set(): + # TODO(b/421935613): Log errors better like SyncPhased. logger.error( "error: Unable to fully sync the tree in interleaved mode." ) diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 60f283afe..e7e31792c 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py @@ -310,6 +310,16 @@ class FakeProject: self.name = name or relpath self.objdir = objdir or relpath + self.use_git_worktrees = False + self.UseAlternates = False + self.manifest = mock.MagicMock() + self.manifest.GetProjectsWithName.return_value = [self] + self.config = mock.MagicMock() + self.EnableRepositoryExtension = mock.MagicMock() + + def RelPath(self, local=None): + return self.relpath + def __str__(self): return f"project: {self.relpath}" @@ -531,7 +541,11 @@ class InterleavedSyncTest(unittest.TestCase): self.manifest.CloneBundle = False self.manifest.default.sync_j = 1 - self.cmd = sync.Sync(manifest=self.manifest) + self.outer_client = mock.MagicMock() + self.outer_client.manifest.IsArchive = False + self.cmd = sync.Sync( + manifest=self.manifest, outer_client=self.outer_client + ) self.cmd.outer_manifest = self.manifest # Mock projects. @@ -549,6 +563,21 @@ class InterleavedSyncTest(unittest.TestCase): mock.patch.object(sync, "_PostRepoUpgrade").start() mock.patch.object(sync, "_PostRepoFetch").start() + # Mock parallel context for worker tests. + self.parallel_context_patcher = mock.patch( + "subcmds.sync.Sync.get_parallel_context" + ) + self.mock_get_parallel_context = self.parallel_context_patcher.start() + self.sync_dict = {} + self.mock_context = { + "projects": [], + "sync_dict": self.sync_dict, + } + self.mock_get_parallel_context.return_value = self.mock_context + + # Mock _GetCurrentBranchOnly for worker tests. + mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() + def tearDown(self): """Clean up resources.""" shutil.rmtree(self.repodir) @@ -635,3 +664,153 @@ class InterleavedSyncTest(unittest.TestCase): work_items_sets = {frozenset(item) for item in work_items} expected_sets = {frozenset([0, 2]), frozenset([1])} self.assertEqual(work_items_sets, expected_sets) + + def _get_opts(self, args=None): + """Helper to get default options for worker tests.""" + if args is None: + args = ["--interleaved"] + opt, _ = self.cmd.OptionParser.parse_args(args) + # Set defaults for options used by the worker. + opt.quiet = True + opt.verbose = False + opt.force_sync = False + opt.clone_bundle = False + opt.tags = False + opt.optimized_fetch = False + opt.retry_fetches = 0 + opt.prune = False + opt.detach_head = False + opt.force_checkout = False + opt.rebase = False + return opt + + def test_worker_successful_sync(self): + """Test _InterleavedSyncWorker with a successful fetch and checkout.""" + opt = self._get_opts() + project = self.projA + project.Sync_NetworkHalf = mock.Mock( + return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) + ) + project.Sync_LocalHalf = mock.Mock() + project.manifest.manifestProject.config = mock.MagicMock() + self.mock_context["projects"] = [project] + + with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer: + mock_sync_buf_instance = mock.MagicMock() + mock_sync_buf_instance.Finish.return_value = True + mock_sync_buffer.return_value = mock_sync_buf_instance + + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + + self.assertEqual(len(result_obj.results), 1) + result = result_obj.results[0] + self.assertTrue(result.fetch_success) + self.assertTrue(result.checkout_success) + self.assertIsNone(result.fetch_error) + self.assertIsNone(result.checkout_error) + project.Sync_NetworkHalf.assert_called_once() + project.Sync_LocalHalf.assert_called_once() + + def test_worker_fetch_fails(self): + """Test _InterleavedSyncWorker with a failed fetch.""" + opt = self._get_opts() + project = self.projA + fetch_error = GitError("Fetch failed") + project.Sync_NetworkHalf = mock.Mock( + return_value=SyncNetworkHalfResult( + error=fetch_error, remote_fetched=False + ) + ) + project.Sync_LocalHalf = mock.Mock() + self.mock_context["projects"] = [project] + + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + result = result_obj.results[0] + + self.assertFalse(result.fetch_success) + self.assertFalse(result.checkout_success) + self.assertEqual(result.fetch_error, fetch_error) + self.assertIsNone(result.checkout_error) + project.Sync_NetworkHalf.assert_called_once() + project.Sync_LocalHalf.assert_not_called() + + def test_worker_fetch_fails_exception(self): + """Test _InterleavedSyncWorker with an exception during fetch.""" + opt = self._get_opts() + project = self.projA + fetch_error = GitError("Fetch failed") + project.Sync_NetworkHalf = mock.Mock(side_effect=fetch_error) + project.Sync_LocalHalf = mock.Mock() + self.mock_context["projects"] = [project] + + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + result = result_obj.results[0] + + self.assertFalse(result.fetch_success) + self.assertFalse(result.checkout_success) + self.assertEqual(result.fetch_error, fetch_error) + project.Sync_NetworkHalf.assert_called_once() + project.Sync_LocalHalf.assert_not_called() + + def test_worker_checkout_fails(self): + """Test _InterleavedSyncWorker with an exception during checkout.""" + opt = self._get_opts() + project = self.projA + project.Sync_NetworkHalf = mock.Mock( + return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) + ) + checkout_error = GitError("Checkout failed") + project.Sync_LocalHalf = mock.Mock(side_effect=checkout_error) + project.manifest.manifestProject.config = mock.MagicMock() + self.mock_context["projects"] = [project] + + with mock.patch("subcmds.sync.SyncBuffer"): + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + result = result_obj.results[0] + + self.assertTrue(result.fetch_success) + self.assertFalse(result.checkout_success) + self.assertIsNone(result.fetch_error) + self.assertEqual(result.checkout_error, checkout_error) + project.Sync_NetworkHalf.assert_called_once() + project.Sync_LocalHalf.assert_called_once() + + def test_worker_local_only(self): + """Test _InterleavedSyncWorker with --local-only.""" + opt = self._get_opts(["--interleaved", "--local-only"]) + project = self.projA + project.Sync_NetworkHalf = mock.Mock() + project.Sync_LocalHalf = mock.Mock() + project.manifest.manifestProject.config = mock.MagicMock() + self.mock_context["projects"] = [project] + + with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer: + mock_sync_buf_instance = mock.MagicMock() + mock_sync_buf_instance.Finish.return_value = True + mock_sync_buffer.return_value = mock_sync_buf_instance + + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + result = result_obj.results[0] + + self.assertTrue(result.fetch_success) + self.assertTrue(result.checkout_success) + project.Sync_NetworkHalf.assert_not_called() + project.Sync_LocalHalf.assert_called_once() + + def test_worker_network_only(self): + """Test _InterleavedSyncWorker with --network-only.""" + opt = self._get_opts(["--interleaved", "--network-only"]) + project = self.projA + project.Sync_NetworkHalf = mock.Mock( + return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) + ) + project.Sync_LocalHalf = mock.Mock() + self.mock_context["projects"] = [project] + + result_obj = self.cmd._InterleavedSyncWorker(opt, [0]) + result = result_obj.results[0] + + self.assertTrue(result.fetch_success) + self.assertTrue(result.checkout_success) + project.Sync_NetworkHalf.assert_called_once() + project.Sync_LocalHalf.assert_not_called()