Skip to content

State

Session persistence — protocol, models, and built-in storage backends.

See the Backends guide for usage instructions, configuration, and collection schemas for each backend.

Base protocol and models

q2google.state.base

Abstract persistence protocol and JSON-serializable session models.

Defines :class:SyncStateBackend, document types for staged sync, and helpers such as :func:new_session.

BatchState(batch_index: int, file_names: list[str], status: StageStatus = 'pending', responses_json: list[str] | None = None, error: ErrorRecord | None = None) dataclass

Tracks one mediaItems:batchCreate HTTP batch within the create stage.

Attributes:

Name Type Description
batch_index int

Zero-based index among create batches for this session.

file_names list[str]

Filenames included in this batch (aligned with API order).

status StageStatus

Batch-level lifecycle state.

responses_json list[str] | None

Optional serialized per-response JSON strings after success.

error ErrorRecord | None

Populated when the batch fails as a whole.

from_dict(data: dict[str, Any]) -> BatchState classmethod

Deserialize batch metadata from persisted JSON.

Parameters:

Name Type Description Default
data dict[str, Any]

Mapping with batch_index and optional nested fields.

required

Returns:

Name Type Description
Reconstructed BatchState

class:BatchState.

Source code in q2google/state/base.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BatchState:
    """Deserialize batch metadata from persisted JSON.

    Args:
        data: Mapping with ``batch_index`` and optional nested fields.

    Returns:
        Reconstructed :class:`BatchState`.
    """
    return cls(
        batch_index=int(data["batch_index"]),
        file_names=list(data.get("file_names") or []),
        status=data.get("status", "pending"),  # type: ignore[arg-type]
        responses_json=data.get("responses_json"),
        error=data.get("error"),
    )

to_dict() -> dict[str, Any]

Serialize this batch to a plain dict.

Returns:

Type Description
dict[str, Any]

JSON-compatible mapping.

Source code in q2google/state/base.py
def to_dict(self) -> dict[str, Any]:
    """Serialize this batch to a plain dict.

    Returns:
        JSON-compatible mapping.
    """
    return {
        "batch_index": self.batch_index,
        "file_names": list(self.file_names),
        "status": self.status,
        "responses_json": self.responses_json,
        "error": self.error,
    }

ErrorRecord

Bases: TypedDict

Structured error payload stored on an item or batch.

Attributes:

Name Type Description
error_type str

Exception type name.

message str

Human-readable error message.

attempt int

Monotonic attempt counter for retries.

updated_at str

ISO 8601 timestamp when the record was written.

ItemState(file_name: str, media_id: str | None = None, download_url: str | None = None, discovery_status: StageStatus = 'pending', transfer_status: StageStatus = 'pending', create_status: ItemCreateStatus = 'pending', upload_token: str | None = None, errors: dict[str, ErrorRecord] = dict()) dataclass

Per-file progress within a sync session.

Attributes:

Name Type Description
file_name str

GoPro logical filename used as map key in :class:SessionState.

media_id str | None

Optional remote identifier when known.

download_url str | None

Resolved CDN URL after discovery.

discovery_status StageStatus

Lifecycle state for URL resolution.

transfer_status StageStatus

Lifecycle state for download + upload to Google.

create_status ItemCreateStatus

Lifecycle state for batchCreate.

upload_token str | None

Finalized resumable upload token before library registration.

errors dict[str, ErrorRecord]

Map of stage key (e.g. "transfer") to last :class:ErrorRecord.

from_dict(data: dict[str, Any]) -> ItemState classmethod

Deserialize from a dict produced by :meth:to_dict or legacy JSON.

Parameters:

Name Type Description Default
data dict[str, Any]

Mapping with required file_name and optional status fields.

required

Returns:

Name Type Description
Reconstructed ItemState

class:ItemState.

Source code in q2google/state/base.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ItemState:
    """Deserialize from a dict produced by :meth:`to_dict` or legacy JSON.

    Args:
        data: Mapping with required ``file_name`` and optional status fields.

    Returns:
        Reconstructed :class:`ItemState`.
    """
    return cls(
        file_name=str(data["file_name"]),
        media_id=data.get("media_id"),
        download_url=data.get("download_url"),
        discovery_status=data.get("discovery_status", "pending"),  # type: ignore[arg-type]
        transfer_status=data.get("transfer_status", "pending"),  # type: ignore[arg-type]
        create_status=data.get("create_status", "pending"),  # type: ignore[arg-type]
        upload_token=data.get("upload_token"),
        errors=dict(data.get("errors") or {}),
    )

to_dict() -> dict[str, Any]

Serialize this item to a plain dict suitable for JSON.

Returns:

Type Description
dict[str, Any]

JSON-compatible mapping for persistence layers.

Source code in q2google/state/base.py
def to_dict(self) -> dict[str, Any]:
    """Serialize this item to a plain dict suitable for JSON.

    Returns:
        JSON-compatible mapping for persistence layers.
    """
    return {
        "file_name": self.file_name,
        "media_id": self.media_id,
        "download_url": self.download_url,
        "discovery_status": self.discovery_status,
        "transfer_status": self.transfer_status,
        "create_status": self.create_status,
        "upload_token": self.upload_token,
        "errors": dict(self.errors),
    }

SessionState(schema_version: int = 1, session_id: str = '', created_at: str = '', updated_at: str = '', start_date_iso: str = '', end_date_iso: str = '', batch_size: int = 50, stages: dict[StageKey, StageStatus] = (lambda: {'discovery': 'pending', 'transfer': 'pending', 'create': 'pending'})(), items: dict[str, ItemState] = dict(), batches: dict[str, BatchState] = dict()) dataclass

Full persisted session for discovery → transfer → create.

Attributes:

Name Type Description
schema_version int

Document format version for migrations.

session_id str

Stable identifier used as storage key.

created_at str

ISO timestamp when the session was first created.

updated_at str

ISO timestamp last updated via :meth:touch.

start_date_iso str

Capture window start (ISO string).

end_date_iso str

Capture window end (ISO string).

batch_size int

Transfer-stage batch size chosen at session creation.

stages dict[StageKey, StageStatus]

High-level stage keys mapped to coarse status.

items dict[str, ItemState]

Filename-keyed :class:ItemState entries.

batches dict[str, BatchState]

String-keyed batch index → :class:BatchState for create retries.

from_dict(data: dict[str, Any]) -> SessionState classmethod

Deserialize session JSON produced by :meth:to_dict.

Parameters:

Name Type Description Default
data dict[str, Any]

Top-level session mapping.

required

Returns:

Name Type Description
Reconstructed SessionState

class:SessionState.

Source code in q2google/state/base.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SessionState:
    """Deserialize session JSON produced by :meth:`to_dict`.

    Args:
        data: Top-level session mapping.

    Returns:
        Reconstructed :class:`SessionState`.
    """
    items_raw = data.get("items") or {}
    batches_raw = data.get("batches") or {}
    return cls(
        schema_version=int(data.get("schema_version", 1)),
        session_id=str(data.get("session_id", "")),
        created_at=str(data.get("created_at", "")),
        updated_at=str(data.get("updated_at", "")),
        start_date_iso=str(data.get("start_date_iso", "")),
        end_date_iso=str(data.get("end_date_iso", "")),
        batch_size=int(data.get("batch_size", 50)),
        stages=dict(data.get("stages") or {}),  # type: ignore[arg-type]
        items={k: ItemState.from_dict(v) for k, v in items_raw.items()},
        batches={k: BatchState.from_dict(v) for k, v in batches_raw.items()},
    )

to_dict() -> dict[str, Any]

Serialize the full session for JSON or document databases.

Returns:

Type Description
dict[str, Any]

Nested plain dict suitable for :func:json.dumps.

Source code in q2google/state/base.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the full session for JSON or document databases.

    Returns:
        Nested plain dict suitable for :func:`json.dumps`.
    """
    return {
        "schema_version": self.schema_version,
        "session_id": self.session_id,
        "created_at": self.created_at,
        "updated_at": self.updated_at,
        "start_date_iso": self.start_date_iso,
        "end_date_iso": self.end_date_iso,
        "batch_size": self.batch_size,
        "stages": dict(self.stages),
        "items": {k: v.to_dict() for k, v in self.items.items()},
        "batches": {k: v.to_dict() for k, v in self.batches.items()},
    }

touch() -> None

Set updated_at to the current UTC ISO timestamp.

Source code in q2google/state/base.py
def touch(self) -> None:
    """Set ``updated_at`` to the current UTC ISO timestamp."""
    self.updated_at = _utc_now_iso()

SyncStateBackend

Bases: Protocol

Storage backend for :class:SessionState.

load(session_id: str) -> SessionState | None

Load a session by id.

Parameters:

Name Type Description Default
session_id str

Same key passed to :func:new_session / orchestrator.

required

Returns:

Type Description
SessionState | None

Parsed state, or None if no document exists.

Source code in q2google/state/base.py
def load(self, session_id: str) -> SessionState | None:
    """Load a session by id.

    Args:
        session_id: Same key passed to :func:`new_session` / orchestrator.

    Returns:
        Parsed state, or ``None`` if no document exists.
    """

save(state: SessionState) -> None

Atomically persist state (semantics defined by the implementation).

Parameters:

Name Type Description Default
state SessionState

Complete session document to store.

required

Raises:

Type Description
OSError

Implementations may propagate IO failures from the storage layer.

Source code in q2google/state/base.py
def save(self, state: SessionState) -> None:
    """Atomically persist ``state`` (semantics defined by the implementation).

    Args:
        state: Complete session document to store.

    Raises:
        OSError: Implementations may propagate IO failures from the storage layer.
    """

new_session(session_id: str, *, start_date_iso: str, end_date_iso: str, batch_size: int) -> SessionState

Create an empty session with timestamps and stage defaults initialized.

Parameters:

Name Type Description Default
session_id str

External key used for persistence lookups.

required
start_date_iso str

Capture window start as ISO string.

required
end_date_iso str

Capture window end as ISO string.

required
batch_size int

Transfer batch size recorded for later stages.

required

Returns:

Name Type Description
New SessionState

class:SessionState with discovery/transfer/create set to pending.

Source code in q2google/state/base.py
def new_session(
    session_id: str,
    *,
    start_date_iso: str,
    end_date_iso: str,
    batch_size: int,
) -> SessionState:
    """Create an empty session with timestamps and stage defaults initialized.

    Args:
        session_id: External key used for persistence lookups.
        start_date_iso: Capture window start as ISO string.
        end_date_iso: Capture window end as ISO string.
        batch_size: Transfer batch size recorded for later stages.

    Returns:
        New :class:`SessionState` with ``discovery``/``transfer``/``create`` set to ``pending``.
    """
    now = _utc_now_iso()
    return SessionState(
        session_id=session_id,
        created_at=now,
        updated_at=now,
        start_date_iso=start_date_iso,
        end_date_iso=end_date_iso,
        batch_size=batch_size,
    )

JSON file backend

q2google.state.local

Filesystem-backed :class:~q2google.state.base.SyncStateBackend.

Stores each session as a directory tree with one JSON file per item and one per batch, making concurrent writes to different items safe by construction. Each individual file is published atomically via a temp file and :func:os.replace.

Layout::

{root}/
  {session_id}/
    meta.json               # session metadata + stages (no items, no batches)
    items/
      {safe_file_name}.json # one file per ItemState
    batches/
      {batch_key}.json      # one file per BatchState

Legacy flat-file sessions ({session_id}.json) written by older versions of this module are still readable; load detects and falls back to that format transparently.

JsonFileBackend(root: str | Path)

Store each session under {root}/{session_id}/ as a directory of JSON files.

Each :class:~q2google.state.base.ItemState and :class:~q2google.state.base.BatchState is written to its own file so that concurrent writers updating different items never conflict. Session-level metadata (stages, timestamps) lives in meta.json and is still subject to last-write-wins semantics, but stage transitions are sequential in the current orchestrator so this is not a practical concern.

Create the backend and ensure root exists.

Parameters:

Name Type Description Default
root str | Path

Directory that will contain per-session subdirectories.

required
Source code in q2google/state/local.py
def __init__(self, root: str | Path) -> None:
    """Create the backend and ensure ``root`` exists.

    Args:
        root: Directory that will contain per-session subdirectories.
    """
    self._root = Path(root)
    self._root.mkdir(parents=True, exist_ok=True)

load(session_id: str) -> SessionState | None

Load SessionState for session_id from disk.

Falls back to the legacy flat-file format ({session_id}.json) when the session directory does not exist, so sessions written by older versions of this module remain readable without any migration step.

Parameters:

Name Type Description Default
session_id str

Session key used when saving.

required

Returns:

Type Description
SessionState | None

Parsed state, or None if neither the directory nor the legacy file exists.

Raises:

Type Description
JSONDecodeError

If any JSON file on disk is malformed.

Source code in q2google/state/local.py
def load(self, session_id: str) -> SessionState | None:
    """Load ``SessionState`` for ``session_id`` from disk.

    Falls back to the legacy flat-file format (``{session_id}.json``) when the
    session directory does not exist, so sessions written by older versions of this
    module remain readable without any migration step.

    Args:
        session_id: Session key used when saving.

    Returns:
        Parsed state, or ``None`` if neither the directory nor the legacy file exists.

    Raises:
        json.JSONDecodeError: If any JSON file on disk is malformed.
    """
    legacy = self._root / f"{_safe_name(session_id)}.json"
    if legacy.is_file():
        return SessionState.from_dict(json.loads(legacy.read_text(encoding="utf-8")))

    meta_path = self._meta_path(session_id)
    if not meta_path.is_file():
        return None

    session_dir = self._session_dir(session_id)
    data: dict[str, Any] = json.loads(meta_path.read_text(encoding="utf-8"))

    data["items"] = {
        d["file_name"]: d
        for p in (session_dir / "items").glob("*.json")
        for d in (json.loads(p.read_text(encoding="utf-8")),)
    }
    data["batches"] = {
        str(d["batch_index"]): d
        for p in (session_dir / "batches").glob("*.json")
        for d in (json.loads(p.read_text(encoding="utf-8")),)
    }
    return SessionState.from_dict(data)

save(state: SessionState) -> None

Persist state by writing metadata, items, and batches to separate files.

Each file is written atomically. Writers updating different items never conflict because they target distinct paths.

Parameters:

Name Type Description Default
state SessionState

Complete session document to store.

required

Raises:

Type Description
OSError

On failure to write any individual file.

Source code in q2google/state/local.py
def save(self, state: SessionState) -> None:
    """Persist ``state`` by writing metadata, items, and batches to separate files.

    Each file is written atomically.  Writers updating different items never
    conflict because they target distinct paths.

    Args:
        state: Complete session document to store.

    Raises:
        OSError: On failure to write any individual file.
    """
    full = state.to_dict()
    meta = {k: full[k] for k in _METADATA_KEYS}
    self._atomic_write(self._meta_path(state.session_id), meta)

    for file_name, item in full["items"].items():
        self._atomic_write(self._item_path(state.session_id, file_name), item)

    for batch_key, batch in full["batches"].items():
        self._atomic_write(self._batch_path(state.session_id, str(batch_key)), batch)

MongoDB backend

q2google.state.mongo

MongoDB-backed :class:~q2google.state.base.SyncStateBackend.

Stores session state across three collections that mirror the filesystem layout used by :class:~q2google.state.local.JsonFileBackend:

  • sessions — one document per session containing metadata and stage statuses.
  • items — one document per (session_id, file_name) pair.
  • batches — one document per (session_id, batch_index) pair.

The database name is parsed from the URI path component (mongodb://host:27017/q2google → database q2google). When the path is absent or /, the name defaults to q2google.

Requires the pymongo package (pip install q2google[mongo]).

Layout::

<database>/
  sessions   { session_id, schema_version, created_at, updated_at,
               start_date_iso, end_date_iso, batch_size, stages }
  items      { session_id, file_name, media_id, download_url,
               discovery_status, transfer_status, create_status,
               upload_token, errors }
  batches    { session_id, batch_index, file_names, status,
               responses_json, error }

MongoBackend(uri: str)

Store each session across three MongoDB collections.

Each :class:~q2google.state.base.ItemState and :class:~q2google.state.base.BatchState is upserted to its own document, matching the per-file isolation of :class:~q2google.state.local.JsonFileBackend. Session-level metadata lives in the sessions collection.

Indexes are created lazily the first time :meth:save is called on a new instance, ensuring that the collections are usable without a separate setup step.

Create the backend and connect to MongoDB.

Parameters:

Name Type Description Default
uri str

MongoDB connection string. The database name is parsed from the URI path component; defaults to "q2google" when absent.

required

Raises:

Type Description
ImportError

When pymongo is not installed. Install it with pip install q2google[mongo].

Source code in q2google/state/mongo.py
def __init__(self, uri: str) -> None:
    """Create the backend and connect to MongoDB.

    Args:
        uri: MongoDB connection string. The database name is parsed from the URI
            path component; defaults to ``"q2google"`` when absent.

    Raises:
        ImportError: When ``pymongo`` is not installed. Install it with
            ``pip install q2google[mongo]``.
    """
    try:
        import pymongo
    except ImportError as exc:
        raise ImportError("MongoBackend requires pymongo. Install it with: pip install q2google[mongo]") from exc

    self._client: pymongo.MongoClient[dict[str, Any]] = pymongo.MongoClient(uri)
    self._db: pymongo.database.Database[dict[str, Any]] = self._client[_db_name_from_uri(uri)]
    self._sessions: pymongo.collection.Collection[dict[str, Any]] = self._db["sessions"]
    self._items: pymongo.collection.Collection[dict[str, Any]] = self._db["items"]
    self._batches: pymongo.collection.Collection[dict[str, Any]] = self._db["batches"]
    self._indexes_ensured = False

load(session_id: str) -> SessionState | None

Load SessionState for session_id from MongoDB.

Fetches the session metadata document from sessions, then all item documents from items and all batch documents from batches for the same session_id.

Parameters:

Name Type Description Default
session_id str

Session key used when saving.

required

Returns:

Name Type Description
Reconstructed SessionState | None

class:~q2google.state.base.SessionState, or None if

SessionState | None

no session document exists for session_id.

Source code in q2google/state/mongo.py
def load(self, session_id: str) -> SessionState | None:
    """Load ``SessionState`` for ``session_id`` from MongoDB.

    Fetches the session metadata document from ``sessions``, then all item
    documents from ``items`` and all batch documents from ``batches`` for the
    same ``session_id``.

    Args:
        session_id: Session key used when saving.

    Returns:
        Reconstructed :class:`~q2google.state.base.SessionState`, or ``None`` if
        no session document exists for ``session_id``.
    """
    meta = self._sessions.find_one({"session_id": session_id}, {"_id": 0})
    if meta is None:
        return None

    data: dict[str, Any] = dict(meta)

    data["items"] = {doc["file_name"]: doc for doc in self._items.find({"session_id": session_id}, {"_id": 0})}
    data["batches"] = {
        str(doc["batch_index"]): doc for doc in self._batches.find({"session_id": session_id}, {"_id": 0})
    }

    return SessionState.from_dict(data)

save(state: SessionState) -> None

Persist state by upserting documents into all three collections.

Each item and batch is upserted independently, so concurrent writers updating different items never conflict.

Parameters:

Name Type Description Default
state SessionState

Complete session document to store.

required
Source code in q2google/state/mongo.py
def save(self, state: SessionState) -> None:
    """Persist ``state`` by upserting documents into all three collections.

    Each item and batch is upserted independently, so concurrent writers
    updating different items never conflict.

    Args:
        state: Complete session document to store.
    """
    self._ensure_indexes()

    full = state.to_dict()
    sid = state.session_id

    meta = {k: full[k] for k in _METADATA_KEYS}
    self._sessions.replace_one({"session_id": sid}, meta, upsert=True)

    for item_doc in full["items"].values():
        self._items.replace_one(
            {"session_id": sid, "file_name": item_doc["file_name"]},
            {"session_id": sid, **item_doc},
            upsert=True,
        )

    for batch_doc in full["batches"].values():
        self._batches.replace_one(
            {"session_id": sid, "batch_index": batch_doc["batch_index"]},
            {"session_id": sid, **batch_doc},
            upsert=True,
        )