1
0
Fork 0

sync: Add orchestration logic for --interleaved

Introduce the parallel orchestration framework for `repo sync --interleave`.

The new logic respects project dependencies by processing them in
hierarchical levels. Projects sharing a git object directory are grouped and
processed serially. Also reuse the familiar fetch progress bar UX.

Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
This commit is contained in:
Gavin Mak 2025-06-12 04:36:55 +00:00
parent 85352825ff
commit 5de91e86d0
2 changed files with 372 additions and 12 deletions

View file

@ -25,7 +25,7 @@ from pathlib import Path
import sys
import tempfile
import time
from typing import List, NamedTuple, Set, Union
from typing import List, Optional, NamedTuple, Set, Union
import urllib.error
import urllib.parse
import urllib.request
@ -194,6 +194,46 @@ class _CheckoutOneResult(NamedTuple):
finish: float
class _SyncResult(NamedTuple):
"""Individual project sync result for interleaved mode.
Attributes:
relpath: The relative path of the project from the top of the repo client.
fetch_success: True if the fetch operation was successful.
checkout_success: True if the checkout operation was successful.
fetch_error: The Exception object if the fetch failed, else None.
checkout_error: The Exception object if the checkout failed, else None.
fetch_start_time: A time.time() timestamp of when the fetch started.
fetch_finish_time: A time.time() timestamp of when the fetch finished.
checkout_start_time: A time.time() timestamp of when the checkout started.
checkout_finish_time: A time.time() timestamp of when the checkout
finished.
"""
relpath: str
fetch_success: bool
checkout_success: bool
fetch_error: Union[Exception, None]
checkout_error: Union[Exception, None]
# Timestamps for logging.
fetch_start_time: float
fetch_finish_time: float
checkout_start_time: float
checkout_finish_time: float
class _InterleavedSyncResult(NamedTuple):
"""The result of an interleaved sync worker.
Attributes:
results: A list of _SyncResult objects, one for each project the worker
was assigned to process. This list is empty if the worker fails
unexpectedly before generating any results.
"""
results: List[_SyncResult]
class SuperprojectError(SyncError):
"""Superproject sync repo."""
@ -1828,6 +1868,16 @@ later is required to fix a server side protocol bug.
all_manifests=not opt.this_manifest_only,
)
# Log the repo projects by existing and new.
existing = [x for x in all_projects if x.Exists]
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
mp.config.SetString(
"repo.newprojectcount", str(len(all_projects) - len(existing))
)
self._fetch_times = _FetchTimes(manifest)
self._local_sync_state = LocalSyncState(manifest)
if opt.interleaved:
sync_method = self._SyncInterleaved
else:
@ -1890,15 +1940,6 @@ later is required to fix a server side protocol bug.
err_update_projects = False
err_update_linkfiles = False
# Log the repo projects by existing and new.
existing = [x for x in all_projects if x.Exists]
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
mp.config.SetString(
"repo.newprojectcount", str(len(all_projects) - len(existing))
)
self._fetch_times = _FetchTimes(manifest)
self._local_sync_state = LocalSyncState(manifest)
if not opt.local_only:
with multiprocessing.Manager() as manager:
with ssh.ProxyManager(manager) as ssh_proxy:
@ -2003,6 +2044,88 @@ later is required to fix a server side protocol bug.
)
raise SyncError(aggregate_errors=errors)
@classmethod
def _InterleavedSyncWorker(
cls, opt, project_indices
) -> _InterleavedSyncResult:
"""Worker for interleaved sync.
This function is responsible for syncing a group of projects that share
a git object directory.
Args:
opt: Program options returned from optparse.
project_indices: A list of indices into the projects list stored in
the parallel context.
Returns:
An `_InterleavedSyncResult` containing the results for each project.
"""
results = []
context = cls.get_parallel_context()
projects = context["projects"]
sync_dict = context["sync_dict"]
# Use the first project as the representative for the progress bar.
first_project = projects[project_indices[0]]
key = f"{first_project.name} @ {first_project.relpath}"
start_time = time.time()
sync_dict[key] = start_time
try:
for idx in project_indices:
project = projects[idx]
# For now, simulate a successful sync.
# TODO(b/421935613): Perform the actual git fetch and checkout.
results.append(
_SyncResult(
relpath=project.relpath,
fetch_success=True,
checkout_success=True,
fetch_error=None,
checkout_error=None,
fetch_start_time=0,
fetch_finish_time=0,
checkout_start_time=0,
checkout_finish_time=0,
)
)
finally:
del sync_dict[key]
return _InterleavedSyncResult(results=results)
def _ProcessSyncInterleavedResults(
self,
synced_relpaths: Set[str],
err_event: _threading.Event,
errors: List[Exception],
opt: optparse.Values,
pool: Optional[multiprocessing.Pool],
pm: Progress,
results_sets: List[_InterleavedSyncResult],
):
"""Callback to process results from interleaved sync workers."""
ret = True
for result_group in results_sets:
for result in result_group.results:
pm.update()
if result.fetch_success and result.checkout_success:
synced_relpaths.add(result.relpath)
else:
ret = False
err_event.set()
if result.fetch_error:
errors.append(result.fetch_error)
if result.checkout_error:
errors.append(result.checkout_error)
if not ret and opt.fail_fast:
if pool:
pool.close()
break
return ret
def _SyncInterleaved(
self,
opt,
@ -2026,7 +2149,122 @@ later is required to fix a server side protocol bug.
2. Projects that share git objects are processed serially to prevent
race conditions.
"""
raise NotImplementedError("Interleaved sync is not implemented yet.")
err_event = multiprocessing.Event()
synced_relpaths = set()
project_list = list(all_projects)
pm = Progress(
"Syncing",
len(project_list),
delay=False,
quiet=opt.quiet,
show_elapsed=True,
)
previously_pending_relpaths = set()
sync_event = _threading.Event()
def _MonitorSyncLoop():
while True:
pm.update(inc=0, msg=self._GetSyncProgressMessage())
if sync_event.wait(timeout=1):
return
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
with self.ParallelContext():
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
# Outer loop for dynamic project discovery (e.g., submodules).
# It continues until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in synced_relpaths
]
if not projects_to_sync:
break
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
logger.warning(
"Stall detected in interleaved sync, not all "
"projects could be synced."
)
err_event.set()
break
previously_pending_relpaths = pending_relpaths
# Update the projects list for workers in the current pass.
self.get_parallel_context()["projects"] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
# Inner loop to process projects in a hierarchical order.
# This iterates through levels of project dependencies (e.g.
# 'foo' then 'foo/bar'). All projects in one level can be
# processed in parallel, but we must wait for a level to
# complete before starting the next.
for level_projects in _SafeCheckoutOrder(projects_to_sync):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
synced_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._InterleavedSyncWorker, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
):
err_event.set()
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
finally:
sync_event.set()
sync_progress_thread.join()
pm.end()
if err_event.is_set():
logger.error(
"error: Unable to fully sync the tree in interleaved mode."
)
raise SyncError(aggregate_errors=errors)
def _PostRepoUpgrade(manifest, quiet=False):

View file

@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase):
class FakeProject:
def __init__(self, relpath):
def __init__(self, relpath, name=None, objdir=None):
self.relpath = relpath
self.name = name or relpath
self.objdir = objdir or relpath
def __str__(self):
return f"project: {self.relpath}"
@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase):
self.cmd.Execute(self.opt, [])
self.assertIn(self.sync_local_half_error, e.aggregate_errors)
self.assertIn(self.sync_network_half_error, e.aggregate_errors)
class InterleavedSyncTest(unittest.TestCase):
"""Tests for interleaved sync."""
def setUp(self):
"""Set up a sync command with mocks."""
self.repodir = tempfile.mkdtemp(".repo")
self.manifest = mock.MagicMock(repodir=self.repodir)
self.manifest.repoProject.LastFetch = time.time()
self.manifest.repoProject.worktree = self.repodir
self.manifest.manifestProject.worktree = self.repodir
self.manifest.IsArchive = False
self.manifest.CloneBundle = False
self.manifest.default.sync_j = 1
self.cmd = sync.Sync(manifest=self.manifest)
self.cmd.outer_manifest = self.manifest
# Mock projects.
self.projA = FakeProject("projA", objdir="objA")
self.projB = FakeProject("projB", objdir="objB")
self.projA_sub = FakeProject(
"projA/sub", name="projA_sub", objdir="objA_sub"
)
self.projC = FakeProject("projC", objdir="objC")
# Mock methods that are not part of the core interleaved sync logic.
mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start()
mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start()
mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start()
mock.patch.object(sync, "_PostRepoUpgrade").start()
mock.patch.object(sync, "_PostRepoFetch").start()
def tearDown(self):
"""Clean up resources."""
shutil.rmtree(self.repodir)
mock.patch.stopall()
def test_interleaved_fail_fast(self):
"""Test that --fail-fast is respected in interleaved mode."""
opt, args = self.cmd.OptionParser.parse_args(
["--interleaved", "--fail-fast", "-j2"]
)
opt.quiet = True
# With projA/sub, _SafeCheckoutOrder creates two batches:
# 1. [projA, projB]
# 2. [projA/sub]
# We want to fail on the first batch and ensure the second isn't run.
all_projects = [self.projA, self.projB, self.projA_sub]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
# Mock ExecuteInParallel to simulate a failed run on the first batch of
# projects.
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", return_value=False
).start()
with self.assertRaises(sync.SyncFailFastError):
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
def test_interleaved_shared_objdir_serial(self):
"""Test that projects with shared objdir are processed serially."""
opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"])
opt.quiet = True
# Setup projects with a shared objdir.
self.projA.objdir = "common_objdir"
self.projC.objdir = "common_objdir"
all_projects = [self.projA, self.projB, self.projC]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
def execute_side_effect(jobs, target, work_items, **kwargs):
# The callback is a partial object. The first arg is the set we
# need to update to avoid the stall detection.
synced_relpaths_set = kwargs["callback"].args[0]
projects_in_pass = self.cmd.get_parallel_context()["projects"]
for item in work_items:
for project_idx in item:
synced_relpaths_set.add(
projects_in_pass[project_idx].relpath
)
return True
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", side_effect=execute_side_effect
).start()
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
jobs_arg, _, work_items = execute_mock.call_args.args
self.assertEqual(jobs_arg, 2)
work_items_sets = {frozenset(item) for item in work_items}
expected_sets = {frozenset([0, 2]), frozenset([1])}
self.assertEqual(work_items_sets, expected_sets)