Skip to content

Sync

Main orchestrator that moves GoPro cloud assets into Google Photos.

q2google.sync

Orchestration layer for GoPro cloud → Google Photos staged sync.

:class:GoProToPhotosSync loads or creates :class:~q2google.state.base.SessionState, then runs :class:~q2google.stages.discovery.DiscoveryStage, :class:~q2google.stages.transfer.TransferStage, and :class:~q2google.stages.create.CreateStage.

GoProToPhotosSync(gopro: AsyncGoProClient, photos: GooglePhotosClient, state_backend: SyncStateBackend, settings: Q2GoogleSettings = get_settings()) dataclass

Coordinate staged sync from GoPro cloud into Google Photos.

Attributes:

Name Type Description
gopro AsyncGoProClient

Async GoPro cloud client used during discovery.

photos GooglePhotosClient

Facade for resumable upload and mediaItems:batchCreate.

state_backend SyncStateBackend

Pluggable load/save for :class:~q2google.state.base.SessionState.

settings Q2GoogleSettings

Defaults for batch sizes, chunking, and fail_fast behavior.

__post_init__() -> None

Construct stage objects bound to this orchestrator's clients and settings.

Source code in q2google/sync.py
def __post_init__(self) -> None:
    """Construct stage objects bound to this orchestrator's clients and settings."""
    self._discovery = DiscoveryStage(self.gopro, self._persist_state)
    self._transfer = TransferStage(
        self.photos,
        self._persist_state,
        download_chunk_size=self.settings.download_chunk_size_bytes,
        cdn_sock_connect_seconds=self.settings.cdn_download_sock_connect_seconds,
        temp_dir_prefix=self.settings.temp_dir_prefix,
    )
    self._create = CreateStage(
        self.photos,
        self._persist_state,
        library_batch_size=self.settings.photos_library_batch_size,
    )

sync_date_range(start_date: datetime, end_date: datetime, *, session_id: str, batch_size: int | None = None, fail_fast: bool | None = None, on_stage_start: StageStartCallback | None = None, on_stage_complete: StageCompleteCallback | None = None, transfer_metrics: SyncTransferMetrics | None = None) -> list[MediaItemBatchCreateResponse] async

Run discovery → transfer → create for session_id.

Uses state_backend for checkpoints. When persisted state already exists, caller-supplied start_date, end_date, and batch_size are ignored in favor of the stored session.

Parameters:

Name Type Description Default
start_date datetime

Capture window start (new sessions only).

required
end_date datetime

Capture window end (new sessions only).

required
session_id str

Stable document key for load/resume.

required
batch_size int | None

Transfer batch size for new sessions; None uses settings.sync_batch_size.

None
fail_fast bool | None

When not None, overrides settings.fail_fast.

None
on_stage_start StageStartCallback | None

Optional async hook invoked immediately before each stage begins.

None
on_stage_complete StageCompleteCallback | None

Optional async hook invoked after each stage finishes (including on failure). For create, the third argument is the list returned so far from that stage; it is empty when the stage raised before returning.

None
transfer_metrics SyncTransferMetrics | None

Optional mutable counters filled during the transfer stage (CDN bytes, uploaded bytes, phase and wall times).

None

Returns:

Type Description
list[MediaItemBatchCreateResponse]

Flattened list of batch-create responses from this invocation, in batch order.

Source code in q2google/sync.py
async def sync_date_range(
    self,
    start_date: datetime,
    end_date: datetime,
    *,
    session_id: str,
    batch_size: int | None = None,
    fail_fast: bool | None = None,
    on_stage_start: StageStartCallback | None = None,
    on_stage_complete: StageCompleteCallback | None = None,
    transfer_metrics: SyncTransferMetrics | None = None,
) -> list[MediaItemBatchCreateResponse]:
    """Run discovery → transfer → create for ``session_id``.

    Uses ``state_backend`` for checkpoints. When persisted state already exists, caller-supplied
    ``start_date``, ``end_date``, and ``batch_size`` are ignored in favor of the stored session.

    Args:
        start_date: Capture window start (new sessions only).
        end_date: Capture window end (new sessions only).
        session_id: Stable document key for load/resume.
        batch_size: Transfer batch size for new sessions; ``None`` uses ``settings.sync_batch_size``.
        fail_fast: When not ``None``, overrides ``settings.fail_fast``.
        on_stage_start: Optional async hook invoked immediately before each stage begins.
        on_stage_complete: Optional async hook invoked after each stage finishes (including on
            failure). For ``create``, the third argument is the list returned so far from that
            stage; it is empty when the stage raised before returning.
        transfer_metrics: Optional mutable counters filled during the transfer stage (CDN bytes,
            uploaded bytes, phase and wall times).

    Returns:
        Flattened list of batch-create responses from this invocation, in batch order.
    """
    effective_batch = self.settings.sync_batch_size if batch_size is None else batch_size
    effective_fail_fast = self.settings.fail_fast if fail_fast is None else fail_fast

    loaded = await asyncio.to_thread(self.state_backend.load, session_id)
    if loaded is None:
        state = new_session(
            session_id,
            start_date_iso=start_date.isoformat(),
            end_date_iso=end_date.isoformat(),
            batch_size=effective_batch,
        )
        await self._persist_state(state)
    else:
        state = loaded

    all_responses: list[MediaItemBatchCreateResponse] = []

    try:
        if on_stage_start is not None:
            await on_stage_start("discovery")
        await self._discovery.run(state)
    finally:
        if on_stage_complete is not None:
            await on_stage_complete("discovery", state, None)

    try:
        if on_stage_start is not None:
            await on_stage_start("transfer")
        await self._transfer.run(
            state,
            batch_size=state.batch_size,
            fail_fast=effective_fail_fast,
            metrics=transfer_metrics,
        )
    finally:
        if on_stage_complete is not None:
            await on_stage_complete("transfer", state, None)

    batch_responses: list[MediaItemBatchCreateResponse] = []
    try:
        if on_stage_start is not None:
            await on_stage_start("create")
        batch_responses = await self._create.run(state, fail_fast=effective_fail_fast)
    finally:
        if on_stage_complete is not None:
            await on_stage_complete("create", state, batch_responses)

    all_responses.extend(batch_responses)

    logging.debug(
        "Session %s — media item responses this run: %s",
        session_id,
        [response.model_dump_json() for response in all_responses],
    )
    return all_responses

SyncTransferMetrics(bytes_downloaded: int = 0, bytes_uploaded: int = 0, seconds_downloading: float = 0.0, seconds_uploading: float = 0.0, seconds_transfer_wall: float = 0.0) dataclass

Aggregates CDN download and Google upload volume and wall-clock phase times.

Attributes:

Name Type Description
bytes_downloaded int

Sum of local file sizes produced by successful CDN GETs in this run.

bytes_uploaded int

Sum of bytes successfully sent via resumable upload (file sizes uploaded).

seconds_downloading float

Wall time spent awaiting CDN download batches (per-batch sum).

seconds_uploading float

Wall time spent awaiting resumable upload batches (per-batch sum).

seconds_transfer_wall float

Wall time for the whole transfer stage body (excluding empty skip).