sync: Add orchestration logic for --interleaved
Introduce the parallel orchestration framework for `repo sync --interleaved`. The new logic respects project dependencies by processing them in hierarchical levels. Projects sharing a git object directory are grouped and processed serially. Also reuse the familiar fetch progress bar UX. Bug: 421935613 Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
This commit is contained in:
parent
85352825ff
commit
fc39a70f48
2 changed files with 372 additions and 12 deletions
260
subcmds/sync.py
260
subcmds/sync.py
|
@ -25,7 +25,7 @@ from pathlib import Path
|
|||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from typing import List, NamedTuple, Set, Union
|
||||
from typing import List, NamedTuple, Optional, Set, Union
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
@ -194,6 +194,46 @@ class _CheckoutOneResult(NamedTuple):
|
|||
finish: float
|
||||
|
||||
|
||||
class _SyncResult(NamedTuple):
|
||||
"""Individual project sync result for interleaved mode.
|
||||
|
||||
Attributes:
|
||||
relpath: The relative path of the project from the top of the repo client.
|
||||
fetch_success: True if the fetch operation was successful.
|
||||
checkout_success: True if the checkout operation was successful.
|
||||
fetch_error: The Exception object if the fetch failed, else None.
|
||||
checkout_error: The Exception object if the checkout failed, else None.
|
||||
fetch_start: A time.time() timestamp of when the fetch started.
|
||||
fetch_finish: A time.time() timestamp of when the fetch finished.
|
||||
checkout_start: A time.time() timestamp of when the checkout started.
|
||||
checkout_finish: A time.time() timestamp of when the checkout
|
||||
finished.
|
||||
"""
|
||||
|
||||
relpath: str
|
||||
fetch_success: bool
|
||||
checkout_success: bool
|
||||
fetch_error: Optional[Exception]
|
||||
checkout_error: Optional[Exception]
|
||||
|
||||
fetch_start: Optional[float]
|
||||
fetch_finish: Optional[float]
|
||||
checkout_start: Optional[float]
|
||||
checkout_finish: Optional[float]
|
||||
|
||||
|
||||
class _InterleavedSyncResult(NamedTuple):
|
||||
"""The result of an interleaved sync worker.
|
||||
|
||||
Attributes:
|
||||
results: A list of _SyncResult objects, one for each project the worker
|
||||
was assigned to process. This list is empty if the worker fails
|
||||
unexpectedly before generating any results.
|
||||
"""
|
||||
|
||||
results: List[_SyncResult]
|
||||
|
||||
|
||||
class SuperprojectError(SyncError):
|
||||
"""Superproject sync repo."""
|
||||
|
||||
|
@ -1828,6 +1868,16 @@ later is required to fix a server side protocol bug.
|
|||
all_manifests=not opt.this_manifest_only,
|
||||
)
|
||||
|
||||
# Log the repo projects by existing and new.
|
||||
existing = [x for x in all_projects if x.Exists]
|
||||
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
|
||||
mp.config.SetString(
|
||||
"repo.newprojectcount", str(len(all_projects) - len(existing))
|
||||
)
|
||||
|
||||
self._fetch_times = _FetchTimes(manifest)
|
||||
self._local_sync_state = LocalSyncState(manifest)
|
||||
|
||||
if opt.interleaved:
|
||||
sync_method = self._SyncInterleaved
|
||||
else:
|
||||
|
@ -1890,15 +1940,6 @@ later is required to fix a server side protocol bug.
|
|||
err_update_projects = False
|
||||
err_update_linkfiles = False
|
||||
|
||||
# Log the repo projects by existing and new.
|
||||
existing = [x for x in all_projects if x.Exists]
|
||||
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
|
||||
mp.config.SetString(
|
||||
"repo.newprojectcount", str(len(all_projects) - len(existing))
|
||||
)
|
||||
|
||||
self._fetch_times = _FetchTimes(manifest)
|
||||
self._local_sync_state = LocalSyncState(manifest)
|
||||
if not opt.local_only:
|
||||
with multiprocessing.Manager() as manager:
|
||||
with ssh.ProxyManager(manager) as ssh_proxy:
|
||||
|
@ -2003,6 +2044,88 @@ later is required to fix a server side protocol bug.
|
|||
)
|
||||
raise SyncError(aggregate_errors=errors)
|
||||
|
||||
@classmethod
|
||||
def _InterleavedSyncWorker(
|
||||
cls, opt, project_indices
|
||||
) -> _InterleavedSyncResult:
|
||||
"""Worker for interleaved sync.
|
||||
|
||||
This function is responsible for syncing a group of projects that share
|
||||
a git object directory.
|
||||
|
||||
Args:
|
||||
opt: Program options returned from optparse.
|
||||
project_indices: A list of indices into the projects list stored in
|
||||
the parallel context.
|
||||
|
||||
Returns:
|
||||
An `_InterleavedSyncResult` containing the results for each project.
|
||||
"""
|
||||
results = []
|
||||
context = cls.get_parallel_context()
|
||||
projects = context["projects"]
|
||||
sync_dict = context["sync_dict"]
|
||||
|
||||
# Use the first project as the representative for the progress bar.
|
||||
first_project = projects[project_indices[0]]
|
||||
key = f"{first_project.name} @ {first_project.relpath}"
|
||||
start_time = time.time()
|
||||
sync_dict[key] = start_time
|
||||
|
||||
try:
|
||||
for idx in project_indices:
|
||||
project = projects[idx]
|
||||
# For now, simulate a successful sync.
|
||||
# TODO(b/421935613): Perform the actual git fetch and checkout.
|
||||
results.append(
|
||||
_SyncResult(
|
||||
relpath=project.relpath,
|
||||
fetch_success=True,
|
||||
checkout_success=True,
|
||||
fetch_error=None,
|
||||
checkout_error=None,
|
||||
fetch_start=None,
|
||||
fetch_finish=None,
|
||||
checkout_start=None,
|
||||
checkout_finish=None,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
del sync_dict[key]
|
||||
|
||||
return _InterleavedSyncResult(results=results)
|
||||
|
||||
def _ProcessSyncInterleavedResults(
|
||||
self,
|
||||
synced_relpaths: Set[str],
|
||||
err_event: _threading.Event,
|
||||
errors: List[Exception],
|
||||
opt: optparse.Values,
|
||||
pool: Optional[multiprocessing.Pool],
|
||||
pm: Progress,
|
||||
results_sets: List[_InterleavedSyncResult],
|
||||
):
|
||||
"""Callback to process results from interleaved sync workers."""
|
||||
ret = True
|
||||
for result_group in results_sets:
|
||||
for result in result_group.results:
|
||||
pm.update()
|
||||
if result.fetch_success and result.checkout_success:
|
||||
synced_relpaths.add(result.relpath)
|
||||
else:
|
||||
ret = False
|
||||
err_event.set()
|
||||
if result.fetch_error:
|
||||
errors.append(result.fetch_error)
|
||||
if result.checkout_error:
|
||||
errors.append(result.checkout_error)
|
||||
|
||||
if not ret and opt.fail_fast:
|
||||
if pool:
|
||||
pool.close()
|
||||
break
|
||||
return ret
|
||||
|
||||
def _SyncInterleaved(
|
||||
self,
|
||||
opt,
|
||||
|
@ -2026,7 +2149,122 @@ later is required to fix a server side protocol bug.
|
|||
2. Projects that share git objects are processed serially to prevent
|
||||
race conditions.
|
||||
"""
|
||||
raise NotImplementedError("Interleaved sync is not implemented yet.")
|
||||
err_event = multiprocessing.Event()
|
||||
synced_relpaths = set()
|
||||
project_list = list(all_projects)
|
||||
pm = Progress(
|
||||
"Syncing",
|
||||
len(project_list),
|
||||
delay=False,
|
||||
quiet=opt.quiet,
|
||||
show_elapsed=True,
|
||||
)
|
||||
previously_pending_relpaths = set()
|
||||
|
||||
sync_event = _threading.Event()
|
||||
|
||||
def _MonitorSyncLoop():
|
||||
while True:
|
||||
pm.update(inc=0, msg=self._GetSyncProgressMessage())
|
||||
if sync_event.wait(timeout=1):
|
||||
return
|
||||
|
||||
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
|
||||
sync_progress_thread.daemon = True
|
||||
|
||||
with self.ParallelContext():
|
||||
self.get_parallel_context()[
|
||||
"sync_dict"
|
||||
] = multiprocessing.Manager().dict()
|
||||
sync_progress_thread.start()
|
||||
|
||||
try:
|
||||
# Outer loop for dynamic project discovery (e.g., submodules).
|
||||
# It continues until no unsynced projects remain.
|
||||
while True:
|
||||
projects_to_sync = [
|
||||
p
|
||||
for p in project_list
|
||||
if p.relpath not in synced_relpaths
|
||||
]
|
||||
if not projects_to_sync:
|
||||
break
|
||||
|
||||
pending_relpaths = {p.relpath for p in projects_to_sync}
|
||||
if previously_pending_relpaths == pending_relpaths:
|
||||
logger.warning(
|
||||
"Stall detected in interleaved sync, not all "
|
||||
"projects could be synced."
|
||||
)
|
||||
err_event.set()
|
||||
break
|
||||
previously_pending_relpaths = pending_relpaths
|
||||
|
||||
# Update the projects list for workers in the current pass.
|
||||
self.get_parallel_context()["projects"] = projects_to_sync
|
||||
project_index_map = {
|
||||
p: i for i, p in enumerate(projects_to_sync)
|
||||
}
|
||||
|
||||
# Inner loop to process projects in a hierarchical order.
|
||||
# This iterates through levels of project dependencies (e.g.
|
||||
# 'foo' then 'foo/bar'). All projects in one level can be
|
||||
# processed in parallel, but we must wait for a level to
|
||||
# complete before starting the next.
|
||||
for level_projects in _SafeCheckoutOrder(projects_to_sync):
|
||||
if not level_projects:
|
||||
continue
|
||||
|
||||
objdir_project_map = collections.defaultdict(list)
|
||||
for p in level_projects:
|
||||
objdir_project_map[p.objdir].append(
|
||||
project_index_map[p]
|
||||
)
|
||||
|
||||
work_items = list(objdir_project_map.values())
|
||||
if not work_items:
|
||||
continue
|
||||
|
||||
jobs = max(1, min(opt.jobs, len(work_items)))
|
||||
callback = functools.partial(
|
||||
self._ProcessSyncInterleavedResults,
|
||||
synced_relpaths,
|
||||
err_event,
|
||||
errors,
|
||||
opt,
|
||||
)
|
||||
if not self.ExecuteInParallel(
|
||||
jobs,
|
||||
functools.partial(self._InterleavedSyncWorker, opt),
|
||||
work_items,
|
||||
callback=callback,
|
||||
output=pm,
|
||||
chunksize=1,
|
||||
):
|
||||
err_event.set()
|
||||
|
||||
if err_event.is_set() and opt.fail_fast:
|
||||
raise SyncFailFastError(aggregate_errors=errors)
|
||||
|
||||
self._ReloadManifest(None, manifest)
|
||||
project_list = self.GetProjects(
|
||||
args,
|
||||
missing_ok=True,
|
||||
submodules_ok=opt.fetch_submodules,
|
||||
manifest=manifest,
|
||||
all_manifests=not opt.this_manifest_only,
|
||||
)
|
||||
finally:
|
||||
sync_event.set()
|
||||
sync_progress_thread.join()
|
||||
|
||||
pm.end()
|
||||
|
||||
if err_event.is_set():
|
||||
logger.error(
|
||||
"error: Unable to fully sync the tree in interleaved mode."
|
||||
)
|
||||
raise SyncError(aggregate_errors=errors)
|
||||
|
||||
|
||||
def _PostRepoUpgrade(manifest, quiet=False):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue