sync: worker
Change-Id: I398602e08a375e974a8914e5fa48ffae673dda9b
This commit is contained in:
parent
fc39a70f48
commit
3345c33901
2 changed files with 286 additions and 97 deletions
15
progress.py
15
progress.py
|
@ -195,6 +195,21 @@ class Progress:
|
|||
)
|
||||
)
|
||||
|
||||
def display_message(self, msg):
|
||||
"""Clears the current progress line and prints a message above it.
|
||||
|
||||
The progress bar is then redrawn on the next line.
|
||||
"""
|
||||
if not _TTY or IsTraceToStderr() or self._quiet:
|
||||
return
|
||||
|
||||
# Erase the current line, print the message with a newline,
|
||||
# and then immediately redraw the progress bar on the new line.
|
||||
sys.stderr.write("\r" + CSI_ERASE_LINE)
|
||||
sys.stderr.write(msg + "\n")
|
||||
sys.stderr.flush()
|
||||
self.update(inc=0)
|
||||
|
||||
def end(self):
|
||||
self._update_event.set()
|
||||
if not _TTY or IsTraceToStderr() or self._quiet:
|
||||
|
|
368
subcmds/sync.py
368
subcmds/sync.py
|
@ -13,6 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import functools
|
||||
import http.cookiejar as cookielib
|
||||
import io
|
||||
|
@ -198,29 +199,37 @@ class _SyncResult(NamedTuple):
|
|||
"""Individual project sync result for interleaved mode.
|
||||
|
||||
Attributes:
|
||||
project_index: The index of the project in the shared list.
|
||||
relpath: The relative path of the project from the top of the repo client.
|
||||
fetch_success: True if the fetch operation was successful.
|
||||
checkout_success: True if the checkout operation was successful.
|
||||
remote_fetched: True if the remote was actually queried.
|
||||
fetch_error: The Exception object if the fetch failed, else None.
|
||||
checkout_error: The Exception object if the checkout failed, else None.
|
||||
fetch_start: A time.time() timestamp of when the fetch started.
|
||||
fetch_finish: A time.time() timestamp of when the fetch finished.
|
||||
checkout_success: True if the checkout operation was successful.
|
||||
checkout_error: The Exception object if the checkout failed, else None.
|
||||
checkout_start: A time.time() timestamp of when the checkout started.
|
||||
checkout_finish: A time.time() timestamp of when the checkout
|
||||
finished.
|
||||
stderr_text: The combined output from both fetch and checkout operations.
|
||||
"""
|
||||
|
||||
project_index: int
|
||||
relpath: str
|
||||
fetch_success: bool
|
||||
checkout_success: bool
|
||||
fetch_error: Optional[Exception]
|
||||
checkout_error: Optional[Exception]
|
||||
|
||||
fetch_success: bool
|
||||
remote_fetched: bool
|
||||
fetch_error: Optional[Exception]
|
||||
fetch_start: Optional[float]
|
||||
fetch_finish: Optional[float]
|
||||
|
||||
checkout_success: bool
|
||||
checkout_error: Optional[Exception]
|
||||
checkout_start: Optional[float]
|
||||
checkout_finish: Optional[float]
|
||||
|
||||
stderr_text: str
|
||||
|
||||
|
||||
class _InterleavedSyncResult(NamedTuple):
|
||||
"""The result of an interleaved sync worker.
|
||||
|
@ -2065,29 +2074,120 @@ later is required to fix a server side protocol bug.
|
|||
context = cls.get_parallel_context()
|
||||
projects = context["projects"]
|
||||
sync_dict = context["sync_dict"]
|
||||
ssh_proxy = context.get("ssh_proxy")
|
||||
|
||||
# Use the first project as the representative for the progress bar.
|
||||
first_project = projects[project_indices[0]]
|
||||
key = f"{first_project.name} @ {first_project.relpath}"
|
||||
start_time = time.time()
|
||||
sync_dict[key] = start_time
|
||||
sync_dict[key] = time.time()
|
||||
|
||||
try:
|
||||
for idx in project_indices:
|
||||
project = projects[idx]
|
||||
# For now, simulate a successful sync.
|
||||
# TODO(b/421935613): Perform the actual git fetch and checkout.
|
||||
|
||||
fetch_success = False
|
||||
remote_fetched = False
|
||||
fetch_error = None
|
||||
fetch_start = None
|
||||
fetch_finish = None
|
||||
network_output = ""
|
||||
|
||||
if not opt.local_only:
|
||||
fetch_start = time.time()
|
||||
network_output_capture = io.StringIO()
|
||||
try:
|
||||
sync_result = project.Sync_NetworkHalf(
|
||||
quiet=opt.quiet,
|
||||
verbose=opt.verbose,
|
||||
output_redir=network_output_capture,
|
||||
current_branch_only=cls._GetCurrentBranchOnly(
|
||||
opt, project.manifest
|
||||
),
|
||||
force_sync=opt.force_sync,
|
||||
clone_bundle=opt.clone_bundle,
|
||||
tags=opt.tags,
|
||||
archive=project.manifest.IsArchive,
|
||||
optimized_fetch=opt.optimized_fetch,
|
||||
retry_fetches=opt.retry_fetches,
|
||||
prune=opt.prune,
|
||||
ssh_proxy=ssh_proxy,
|
||||
clone_filter=project.manifest.CloneFilter,
|
||||
partial_clone_exclude=project.manifest.PartialCloneExclude,
|
||||
clone_filter_for_depth=project.manifest.CloneFilterForDepth,
|
||||
)
|
||||
fetch_success = sync_result.success
|
||||
remote_fetched = sync_result.remote_fetched
|
||||
if sync_result.error:
|
||||
fetch_error = sync_result.error
|
||||
except Exception as e:
|
||||
fetch_success = False
|
||||
fetch_error = e
|
||||
finally:
|
||||
fetch_finish = time.time()
|
||||
network_output = network_output_capture.getvalue()
|
||||
else:
|
||||
fetch_success = True
|
||||
|
||||
checkout_success = False
|
||||
checkout_error = None
|
||||
checkout_start = None
|
||||
checkout_finish = None
|
||||
checkout_stderr = ""
|
||||
|
||||
if fetch_success and not opt.network_only:
|
||||
checkout_start = time.time()
|
||||
stderr_capture = io.StringIO()
|
||||
try:
|
||||
with contextlib.redirect_stderr(stderr_capture):
|
||||
syncbuf = SyncBuffer(
|
||||
project.manifest.manifestProject.config,
|
||||
detach_head=opt.detach_head,
|
||||
)
|
||||
local_half_errors = []
|
||||
project.Sync_LocalHalf(
|
||||
syncbuf,
|
||||
force_sync=opt.force_sync,
|
||||
force_checkout=opt.force_checkout,
|
||||
force_rebase=opt.rebase,
|
||||
errors=local_half_errors,
|
||||
verbose=opt.verbose,
|
||||
)
|
||||
checkout_success = syncbuf.Finish()
|
||||
if local_half_errors:
|
||||
checkout_success = False
|
||||
checkout_error = SyncError(
|
||||
aggregate_errors=local_half_errors
|
||||
)
|
||||
except Exception as e:
|
||||
checkout_success = False
|
||||
checkout_error = e
|
||||
finally:
|
||||
checkout_finish = time.time()
|
||||
checkout_stderr = stderr_capture.getvalue()
|
||||
elif fetch_success:
|
||||
checkout_success = True
|
||||
|
||||
# Consolidate all captured output.
|
||||
stderr_text = ""
|
||||
if network_output:
|
||||
stderr_text += "fetch output:\n" + network_output
|
||||
if checkout_stderr:
|
||||
stderr_text += "\ncheckout output:\n" + checkout_stderr
|
||||
|
||||
results.append(
|
||||
_SyncResult(
|
||||
project_index=idx,
|
||||
relpath=project.relpath,
|
||||
fetch_success=True,
|
||||
checkout_success=True,
|
||||
fetch_error=None,
|
||||
checkout_error=None,
|
||||
fetch_start=None,
|
||||
fetch_finish=None,
|
||||
checkout_start=None,
|
||||
checkout_finish=None,
|
||||
fetch_success=fetch_success,
|
||||
remote_fetched=remote_fetched,
|
||||
checkout_success=checkout_success,
|
||||
fetch_error=fetch_error,
|
||||
checkout_error=checkout_error,
|
||||
stderr_text=stderr_text.strip(),
|
||||
fetch_start=fetch_start,
|
||||
fetch_finish=fetch_finish,
|
||||
checkout_start=checkout_start,
|
||||
checkout_finish=checkout_finish,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
|
@ -2107,9 +2207,39 @@ later is required to fix a server side protocol bug.
|
|||
):
|
||||
"""Callback to process results from interleaved sync workers."""
|
||||
ret = True
|
||||
projects = self.get_parallel_context()["projects"]
|
||||
for result_group in results_sets:
|
||||
for result in result_group.results:
|
||||
pm.update()
|
||||
project = projects[result.project_index]
|
||||
|
||||
if opt.verbose and result.stderr_text:
|
||||
pm.display_message(result.stderr_text)
|
||||
|
||||
if result.fetch_start:
|
||||
self._fetch_times.Set(
|
||||
project,
|
||||
result.fetch_finish - result.fetch_start,
|
||||
)
|
||||
self._local_sync_state.SetFetchTime(project)
|
||||
self.event_log.AddSync(
|
||||
project,
|
||||
event_log.TASK_SYNC_NETWORK,
|
||||
result.fetch_start,
|
||||
result.fetch_finish,
|
||||
result.fetch_success,
|
||||
)
|
||||
if result.checkout_start:
|
||||
if result.checkout_success:
|
||||
self._local_sync_state.SetCheckoutTime(project)
|
||||
self.event_log.AddSync(
|
||||
project,
|
||||
event_log.TASK_SYNC_LOCAL,
|
||||
result.checkout_start,
|
||||
result.checkout_finish,
|
||||
result.checkout_success,
|
||||
)
|
||||
|
||||
if result.fetch_success and result.checkout_success:
|
||||
synced_relpaths.add(result.relpath)
|
||||
else:
|
||||
|
@ -2149,6 +2279,12 @@ later is required to fix a server side protocol bug.
|
|||
2. Projects that share git objects are processed serially to prevent
|
||||
race conditions.
|
||||
"""
|
||||
if (
|
||||
not opt.local_only
|
||||
and os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0"
|
||||
):
|
||||
_PostRepoFetch(mp, opt.repo_verify)
|
||||
|
||||
err_event = multiprocessing.Event()
|
||||
synced_relpaths = set()
|
||||
project_list = list(all_projects)
|
||||
|
@ -2172,94 +2308,132 @@ later is required to fix a server side protocol bug.
|
|||
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
|
||||
sync_progress_thread.daemon = True
|
||||
|
||||
with self.ParallelContext():
|
||||
self.get_parallel_context()[
|
||||
"sync_dict"
|
||||
] = multiprocessing.Manager().dict()
|
||||
sync_progress_thread.start()
|
||||
with multiprocessing.Manager() as manager, ssh.ProxyManager(
|
||||
manager
|
||||
) as ssh_proxy:
|
||||
ssh_proxy.sock()
|
||||
with self.ParallelContext():
|
||||
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
|
||||
self.get_parallel_context()[
|
||||
"sync_dict"
|
||||
] = multiprocessing.Manager().dict()
|
||||
sync_progress_thread.start()
|
||||
|
||||
try:
|
||||
# Outer loop for dynamic project discovery (e.g., submodules).
|
||||
# It continues until no unsynced projects remain.
|
||||
while True:
|
||||
projects_to_sync = [
|
||||
p
|
||||
for p in project_list
|
||||
if p.relpath not in synced_relpaths
|
||||
]
|
||||
if not projects_to_sync:
|
||||
break
|
||||
try:
|
||||
# Outer loop for dynamic project discovery (e.g., submodules).
|
||||
# It continues until no unsynced projects remain.
|
||||
while True:
|
||||
projects_to_sync = [
|
||||
p
|
||||
for p in project_list
|
||||
if p.relpath not in synced_relpaths
|
||||
]
|
||||
if not projects_to_sync:
|
||||
break
|
||||
|
||||
pending_relpaths = {p.relpath for p in projects_to_sync}
|
||||
if previously_pending_relpaths == pending_relpaths:
|
||||
logger.warning(
|
||||
"Stall detected in interleaved sync, not all "
|
||||
"projects could be synced."
|
||||
)
|
||||
err_event.set()
|
||||
break
|
||||
previously_pending_relpaths = pending_relpaths
|
||||
|
||||
# Update the projects list for workers in the current pass.
|
||||
self.get_parallel_context()["projects"] = projects_to_sync
|
||||
project_index_map = {
|
||||
p: i for i, p in enumerate(projects_to_sync)
|
||||
}
|
||||
|
||||
# Inner loop to process projects in a hierarchical order.
|
||||
# This iterates through levels of project dependencies (e.g.
|
||||
# 'foo' then 'foo/bar'). All projects in one level can be
|
||||
# processed in parallel, but we must wait for a level to
|
||||
# complete before starting the next.
|
||||
for level_projects in _SafeCheckoutOrder(projects_to_sync):
|
||||
if not level_projects:
|
||||
continue
|
||||
|
||||
objdir_project_map = collections.defaultdict(list)
|
||||
for p in level_projects:
|
||||
objdir_project_map[p.objdir].append(
|
||||
project_index_map[p]
|
||||
pending_relpaths = {p.relpath for p in projects_to_sync}
|
||||
if previously_pending_relpaths == pending_relpaths:
|
||||
logger.warning(
|
||||
"Stall detected in interleaved sync, not all "
|
||||
"projects could be synced."
|
||||
)
|
||||
|
||||
work_items = list(objdir_project_map.values())
|
||||
if not work_items:
|
||||
continue
|
||||
|
||||
jobs = max(1, min(opt.jobs, len(work_items)))
|
||||
callback = functools.partial(
|
||||
self._ProcessSyncInterleavedResults,
|
||||
synced_relpaths,
|
||||
err_event,
|
||||
errors,
|
||||
opt,
|
||||
)
|
||||
if not self.ExecuteInParallel(
|
||||
jobs,
|
||||
functools.partial(self._InterleavedSyncWorker, opt),
|
||||
work_items,
|
||||
callback=callback,
|
||||
output=pm,
|
||||
chunksize=1,
|
||||
):
|
||||
err_event.set()
|
||||
break
|
||||
previously_pending_relpaths = pending_relpaths
|
||||
|
||||
if err_event.is_set() and opt.fail_fast:
|
||||
raise SyncFailFastError(aggregate_errors=errors)
|
||||
# Update the projects list for workers in the current pass.
|
||||
self.get_parallel_context()[
|
||||
"projects"
|
||||
] = projects_to_sync
|
||||
project_index_map = {
|
||||
p: i for i, p in enumerate(projects_to_sync)
|
||||
}
|
||||
|
||||
self._ReloadManifest(None, manifest)
|
||||
project_list = self.GetProjects(
|
||||
args,
|
||||
missing_ok=True,
|
||||
submodules_ok=opt.fetch_submodules,
|
||||
manifest=manifest,
|
||||
all_manifests=not opt.this_manifest_only,
|
||||
)
|
||||
finally:
|
||||
sync_event.set()
|
||||
sync_progress_thread.join()
|
||||
# Inner loop to process projects in a hierarchical order.
|
||||
# This iterates through levels of project dependencies (e.g.
|
||||
# 'foo' then 'foo/bar'). All projects in one level can be
|
||||
# processed in parallel, but we must wait for a level to
|
||||
# complete before starting the next.
|
||||
for level_projects in _SafeCheckoutOrder(
|
||||
projects_to_sync
|
||||
):
|
||||
if not level_projects:
|
||||
continue
|
||||
|
||||
objdir_project_map = collections.defaultdict(list)
|
||||
for p in level_projects:
|
||||
objdir_project_map[p.objdir].append(
|
||||
project_index_map[p]
|
||||
)
|
||||
|
||||
work_items = list(objdir_project_map.values())
|
||||
if not work_items:
|
||||
continue
|
||||
|
||||
jobs = max(1, min(opt.jobs, len(work_items)))
|
||||
callback = functools.partial(
|
||||
self._ProcessSyncInterleavedResults,
|
||||
synced_relpaths,
|
||||
err_event,
|
||||
errors,
|
||||
opt,
|
||||
)
|
||||
if not self.ExecuteInParallel(
|
||||
jobs,
|
||||
functools.partial(
|
||||
self._InterleavedSyncWorker, opt
|
||||
),
|
||||
work_items,
|
||||
callback=callback,
|
||||
output=pm,
|
||||
chunksize=1,
|
||||
):
|
||||
err_event.set()
|
||||
|
||||
if err_event.is_set() and opt.fail_fast:
|
||||
raise SyncFailFastError(aggregate_errors=errors)
|
||||
|
||||
self._ReloadManifest(None, manifest)
|
||||
project_list = self.GetProjects(
|
||||
args,
|
||||
missing_ok=True,
|
||||
submodules_ok=opt.fetch_submodules,
|
||||
manifest=manifest,
|
||||
all_manifests=not opt.this_manifest_only,
|
||||
)
|
||||
finally:
|
||||
sync_event.set()
|
||||
sync_progress_thread.join()
|
||||
|
||||
pm.end()
|
||||
|
||||
for m in self.ManifestList(opt):
|
||||
if m.IsMirror or m.IsArchive:
|
||||
continue
|
||||
try:
|
||||
self.UpdateProjectList(opt, m)
|
||||
self.UpdateCopyLinkfileList(m)
|
||||
except Exception as e:
|
||||
err_event.set()
|
||||
errors.append(e)
|
||||
if isinstance(e, DeleteWorktreeError):
|
||||
errors.extend(e.aggregate_errors)
|
||||
if opt.fail_fast:
|
||||
logger.error(
|
||||
"error: Local cleanup failed; see preceding errors."
|
||||
)
|
||||
raise SyncFailFastError(aggregate_errors=errors)
|
||||
|
||||
if not self.outer_client.manifest.IsArchive:
|
||||
self._GCProjects(project_list, opt, err_event)
|
||||
|
||||
# Print all manifest notices, but only once.
|
||||
printed_notices = set()
|
||||
for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
|
||||
if m.notice and m.notice not in printed_notices:
|
||||
print(m.notice)
|
||||
printed_notices.add(m.notice)
|
||||
|
||||
if err_event.is_set():
|
||||
logger.error(
|
||||
"error: Unable to fully sync the tree in interleaved mode."
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue