Skip to content

Photos

Resumable upload facade and Google Photo Library port protocol.

q2google.photos

High-level Google Photos Library helpers built on :mod:q2google.gphotos.

Keeps direct gphotos imports localized to this module; callers should use :class:GooglePhotosClient, :class:GooglePhotoLibraryPort, and re-exported types.

GooglePhotoLibraryPort

Bases: Protocol

Protocol for Library v1 resumable upload and mediaItems:batchCreate.

Implemented by :class:~q2google.gphotos.api.GooglePhotosAPI when used through :class:GooglePhotosClient.

create_media_item(media_item: MediaItemBatchCreateRequest) -> MediaItemBatchCreateResponse async

Create library media items from finalized upload tokens.

Parameters:

Name Type Description Default
media_item MediaItemBatchCreateRequest

Batch request wrapping newMediaItems (up to API limit).

required

Returns:

Type Description
MediaItemBatchCreateResponse

API response with per-item status and identifiers.

Source code in q2google/photos.py
async def create_media_item(self, media_item: MediaItemBatchCreateRequest) -> MediaItemBatchCreateResponse:
    """Create library media items from finalized upload tokens.

    Args:
        media_item: Batch request wrapping ``newMediaItems`` (up to API limit).

    Returns:
        API response with per-item status and identifiers.
    """
    ...

init_upload_session(content_type: str, content_length: int) -> ResumableUploadSession async

Start a resumable upload and return the session (URL, token, chunk rules).

Parameters:

Name Type Description Default
content_type str

MIME type of the bytes that will be uploaded.

required
content_length int

Total size in bytes of the object to upload.

required

Returns:

Type Description
ResumableUploadSession

Session descriptor including upload_url and chunk granularity.

Source code in q2google/photos.py
async def init_upload_session(self, content_type: str, content_length: int) -> ResumableUploadSession:
    """Start a resumable upload and return the session (URL, token, chunk rules).

    Args:
        content_type: MIME type of the bytes that will be uploaded.
        content_length: Total size in bytes of the object to upload.

    Returns:
        Session descriptor including ``upload_url`` and chunk granularity.
    """
    ...

upload_chunk(url: str, command: str, offset: int, content: bytes) -> ResumableUploadSession async

POST one chunk to the resumable upload URL.

Parameters:

Name Type Description Default
url str

upload_url from init_upload_session.

required
command str

Upload command header value (for example "upload" or "upload, finalize" for the last chunk).

required
offset int

Byte offset of this chunk in the full object.

required
content bytes

Raw chunk bytes.

required

Returns:

Type Description
ResumableUploadSession

Updated session state; the last call should include the upload token

ResumableUploadSession

needed for batchCreate.

Source code in q2google/photos.py
async def upload_chunk(self, url: str, command: str, offset: int, content: bytes) -> ResumableUploadSession:
    """POST one chunk to the resumable upload URL.

    Args:
        url: ``upload_url`` from ``init_upload_session``.
        command: Upload command header value (for example ``"upload"`` or
            ``"upload, finalize"`` for the last chunk).
        offset: Byte offset of this chunk in the full object.
        content: Raw chunk bytes.

    Returns:
        Updated session state; the last call should include the upload token
        needed for ``batchCreate``.
    """
    ...

GooglePhotosAPI(credentials: GooglePhotosOAuth, timeout: float = 600.0)

Thin aiohttp wrapper around selected Google Photos Library v1 endpoints.

Instantiate and use async with GooglePhotosAPI(...) as api to obtain a live session. HTTP error responses are surfaced via response.raise_for_status() (aiohttp client errors).

Create a client; the HTTP session starts on async context enter.

Parameters:

Name Type Description Default
credentials GooglePhotosOAuth

OAuth credentials carrying a valid access token.

required
timeout float

Per-request total timeout in seconds. Each resumable chunk is one request; large uploads often need a generous value.

600.0
Source code in q2google/gphotos/api.py
def __init__(self, credentials: GooglePhotosOAuth, timeout: float = 600.0) -> None:
    """Create a client; the HTTP session starts on async context enter.

    Args:
        credentials: OAuth credentials carrying a valid access token.
        timeout: Per-request total timeout in seconds. Each resumable chunk is one request;
            large uploads often need a generous value.
    """
    self._credentials = credentials
    self._token: str | None = None
    self._timeout = aiohttp.ClientTimeout(total=timeout)
    self._session: aiohttp.ClientSession | None = None

base_url: str property

Root URL for Library API v1 resources for this client.

__aenter__() -> GooglePhotosAPI async

Open the underlying aiohttp client session.

Source code in q2google/gphotos/api.py
async def __aenter__(self) -> GooglePhotosAPI:
    """Open the underlying ``aiohttp`` client session."""
    logging.info("Creating aiohttp client session")
    self._session = aiohttp.ClientSession(timeout=self._timeout)
    return self

__aexit__(exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None) -> None async

Close the client session.

Parameters:

Name Type Description Default
exc_type type[BaseException] | None

Exception type if the context exited due to an error, else None.

required
exc BaseException | None

Active exception instance when exiting with an error, else None.

required
tb TracebackType | None

Traceback associated with exc, if any.

required
Source code in q2google/gphotos/api.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the client session.

    Args:
        exc_type: Exception type if the context exited due to an error, else ``None``.
        exc: Active exception instance when exiting with an error, else ``None``.
        tb: Traceback associated with ``exc``, if any.
    """
    if self._session is not None:
        await self._session.close()
        self._session = None

create_media_item(media_item: MediaItemBatchCreateRequest) -> MediaItemBatchCreateResponse async

Call mediaItems:batchCreate with the given payload.

Parameters:

Name Type Description Default
media_item MediaItemBatchCreateRequest

Batch create request body.

required

Returns:

Type Description
MediaItemBatchCreateResponse

API batch create response.

Source code in q2google/gphotos/api.py
async def create_media_item(self, media_item: MediaItemBatchCreateRequest) -> MediaItemBatchCreateResponse:
    """Call ``mediaItems:batchCreate`` with the given payload.

    Args:
        media_item: Batch create request body.

    Returns:
        API batch create response.
    """
    logging.info("Creating media item")
    session = self._get_session_or_raise()
    async with session.post(
        self.base_url + "mediaItems:batchCreate",
        headers={
            **self._auth_headers(),
            "Content-type": "application/json",
        },
        json=media_item.model_dump(),
    ) as response:
        logging.info(response.headers)
        response.raise_for_status()
        data = await response.json()
        logging.info(data)
        return MediaItemBatchCreateResponse.model_validate(data)

get_media_item(media_item_id: str) -> dict[str, Any] async

GET a single media item by id (mediaItems/{mediaItemId}).

Parameters:

Name Type Description Default
media_item_id str

Library media item id.

required

Returns:

Type Description
dict[str, Any]

Parsed JSON object as returned by the API (shape matches the MediaItem resource).

Source code in q2google/gphotos/api.py
async def get_media_item(self, media_item_id: str) -> dict[str, Any]:
    """GET a single media item by id (``mediaItems/{mediaItemId}``).

    Args:
        media_item_id: Library media item id.

    Returns:
        Parsed JSON object as returned by the API (shape matches the MediaItem resource).
    """
    logging.info(f"Getting media item with ID: {media_item_id}")
    session = self._get_session_or_raise()
    url = self.base_url + f"mediaItems/{media_item_id}"
    async with session.get(
        url,
        headers={
            **self._auth_headers(),
            "Content-type": "application/json",
        },
    ) as response:
        response.raise_for_status()
        data = await response.json()
        logging.info(data)
        return data

init_upload_session(content_type: str, content_length: int) -> ResumableUploadSession async

Start a resumable upload; maps response headers to a session model.

Parameters:

Name Type Description Default
content_type str

MIME type of the file to upload.

required
content_length int

Total byte size of the file.

required

Returns:

Type Description
ResumableUploadSession

Parsed upload session metadata from response headers.

Source code in q2google/gphotos/api.py
async def init_upload_session(self, content_type: str, content_length: int) -> ResumableUploadSession:
    """Start a resumable upload; maps response headers to a session model.

    Args:
        content_type: MIME type of the file to upload.
        content_length: Total byte size of the file.

    Returns:
        Parsed upload session metadata from response headers.
    """
    logging.info("Initializing upload session")
    session = self._get_session_or_raise()
    headers = {
        **self._auth_headers(),
        "Content-Length": "0",
        "Content-Type": "application/octet-stream",
        "X-Goog-Upload-Command": "start",
        "X-Goog-Upload-Content-Type": content_type,
        "X-Goog-Upload-Protocol": "resumable",
        "X-Goog-Upload-Raw-Size": str(content_length),
    }
    async with session.post(self.base_url + "uploads", headers=headers) as response:
        logging.info(response.headers)
        logging.info(await response.text())
        response.raise_for_status()
        return ResumableUploadSession.model_validate(dict(response.headers))

query_upload_status(url: str) -> ResumableUploadSession async

Send an upload query command and return parsed header state.

Parameters:

Name Type Description Default
url str

Resumable upload URL to query.

required

Returns:

Type Description
ResumableUploadSession

Parsed session metadata from response headers.

Source code in q2google/gphotos/api.py
async def query_upload_status(self, url: str) -> ResumableUploadSession:
    """Send an upload ``query`` command and return parsed header state.

    Args:
        url: Resumable upload URL to query.

    Returns:
        Parsed session metadata from response headers.
    """
    logging.info("Querying upload status")
    session = self._get_session_or_raise()
    async with session.post(
        url,
        headers={
            **self._auth_headers(),
            "Content-Length": str(0),
            "X-Goog-Upload-Command": "query",
        },
    ) as response:
        logging.info(response.headers)
        body = await response.text()
        logging.info(body)
        response.raise_for_status()
        return ResumableUploadSession.model_validate(dict(response.headers))

upload_chunk(url: str, command: str, offset: int, content: bytes) -> ResumableUploadSession async

POST one chunk of a resumable upload.

Parameters:

Name Type Description Default
url str

Resumable upload URL from a prior session.

required
command str

Google resumable upload command (e.g. upload, finalize).

required
offset int

Byte offset for this chunk.

required
content bytes

Raw chunk bytes.

required

Returns:

Type Description
ResumableUploadSession

Parsed session state; when the upload completes, upload_token may be set from

ResumableUploadSession

the response body.

Source code in q2google/gphotos/api.py
async def upload_chunk(self, url: str, command: str, offset: int, content: bytes) -> ResumableUploadSession:
    """POST one chunk of a resumable upload.

    Args:
        url: Resumable upload URL from a prior session.
        command: Google resumable upload command (e.g. ``upload``, ``finalize``).
        offset: Byte offset for this chunk.
        content: Raw chunk bytes.

    Returns:
        Parsed session state; when the upload completes, ``upload_token`` may be set from
        the response body.
    """
    logging.info(f"Uploading chunk {offset} to {url}")
    session = self._get_session_or_raise()
    async with session.post(
        url,
        headers={
            **self._auth_headers(),
            "Content-Length": str(len(content)),
            "Content-Type": "application/octet-stream",
            "X-Goog-Upload-Command": command,
            "X-Goog-Upload-Offset": str(offset),
        },
        data=content,
    ) as response:
        logging.info(response.headers)
        body = await response.text()
        logging.info(body)
        response.raise_for_status()
        return ResumableUploadSession.model_validate(dict(response.headers, upload_token=body))

GooglePhotosClient(api: GooglePhotoLibraryPort, chunk_granularity_multiplier: int = 4) dataclass

Stream local files via resumable upload and register them with batchCreate.

Attributes:

Name Type Description
api GooglePhotoLibraryPort

Async port implementing upload session and batch create (typically GooglePhotosAPI).

chunk_granularity_multiplier int

Factor applied to server granularity to size upload chunks; must be >= 1.

create_media_items(media_items: list[NewMediaItem]) -> list[MediaItemBatchCreateResponse] async

Call mediaItems:batchCreate in slices of at most LIBRARY_BATCH_SIZE items.

Parameters:

Name Type Description Default
media_items list[NewMediaItem]

Full list of NewMediaItem to register in the library.

required

Returns:

Type Description
list[MediaItemBatchCreateResponse]

One MediaItemBatchCreateResponse per HTTP batch, in order.

Source code in q2google/photos.py
async def create_media_items(self, media_items: list[NewMediaItem]) -> list[MediaItemBatchCreateResponse]:
    """Call ``mediaItems:batchCreate`` in slices of at most ``LIBRARY_BATCH_SIZE`` items.

    Args:
        media_items: Full list of ``NewMediaItem`` to register in the library.

    Returns:
        One ``MediaItemBatchCreateResponse`` per HTTP batch, in order.
    """
    out: list[MediaItemBatchCreateResponse] = []
    for i in range(0, len(media_items), LIBRARY_BATCH_SIZE):
        batch_media_items = media_items[i : i + LIBRARY_BATCH_SIZE]
        media_item_request = MediaItemBatchCreateRequest(newMediaItems=batch_media_items)
        out.append(await self.api.create_media_item(media_item_request))
    return out

create_media_items_from_upload_sessions(file_names: Sequence[str], sessions: Sequence[ResumableUploadSession]) -> list[MediaItemBatchCreateResponse] async

Pair each local filename with its finalized upload session and call batchCreate.

file_names and sessions must align (same length, same order as uploads).

Parameters:

Name Type Description Default
file_names Sequence[str]

Display names for created media items.

required
sessions Sequence[ResumableUploadSession]

Finalized upload sessions whose upload_token matches each name.

required

Returns:

Type Description
list[MediaItemBatchCreateResponse]

Batch create responses from create_media_items.

Source code in q2google/photos.py
async def create_media_items_from_upload_sessions(
    self,
    file_names: Sequence[str],
    sessions: Sequence[ResumableUploadSession],
) -> list[MediaItemBatchCreateResponse]:
    """Pair each local filename with its finalized upload session and call ``batchCreate``.

    ``file_names`` and ``sessions`` must align (same length, same order as uploads).

    Args:
        file_names: Display names for created media items.
        sessions: Finalized upload sessions whose ``upload_token`` matches each name.

    Returns:
        Batch create responses from ``create_media_items``.
    """
    media_items = [
        NewMediaItem(
            simpleMediaItem=SimpleMediaItem(
                fileName=file_name,
                uploadToken=session.upload_token,
            ),
        )
        for file_name, session in zip(file_names, sessions)
    ]
    return await self.create_media_items(media_items)

upload_file_path(file_name: str, path: Path) -> ResumableUploadSession async

Stream path to Google using resumable upload chunking.

Chunk size is chunk_granularity_multiplier * granularity from the session when present; the final chunk may be smaller and uses the "upload, finalize" command.

Parameters:

Name Type Description Default
file_name str

Logical name used only for MIME guessing (not sent as path).

required
path Path

Readable file on disk whose size defines Content-Length.

required

Returns:

Type Description
ResumableUploadSession

Final ResumableUploadSession after the finalize chunk (includes token).

Raises:

Type Description
RuntimeError

If the session response omits chunk granularity metadata.

ValueError

If chunk_granularity_multiplier is less than 1.

Source code in q2google/photos.py
async def upload_file_path(
    self,
    file_name: str,
    path: Path,
) -> ResumableUploadSession:
    """Stream ``path`` to Google using resumable upload chunking.

    Chunk size is ``chunk_granularity_multiplier * granularity`` from the
    session when present; the final chunk may be smaller and uses the
    ``\"upload, finalize\"`` command.

    Args:
        file_name: Logical name used only for MIME guessing (not sent as path).
        path: Readable file on disk whose size defines ``Content-Length``.

    Returns:
        Final ``ResumableUploadSession`` after the finalize chunk (includes token).

    Raises:
        RuntimeError: If the session response omits chunk granularity metadata.
        ValueError: If ``chunk_granularity_multiplier`` is less than 1.
    """
    if self.chunk_granularity_multiplier < 1:
        msg = "chunk_granularity_multiplier must be >= 1"
        raise ValueError(msg)

    content_length = path.stat().st_size
    guessed = mimetypes.guess_type(file_name)[0]
    content_type = guessed if guessed is not None else "application/octet-stream"

    offset = 0
    upload = await self.api.init_upload_session(content_type, content_length)
    url = upload.upload_url
    if upload.granularity is None:
        msg = "Upload session missing X-Goog-Upload-Chunk-Granularity"
        raise RuntimeError(msg)

    granularity = self.chunk_granularity_multiplier * upload.granularity
    total_chunks = content_length // granularity

    async with aiofiles.open(path, "rb") as body:
        for _ in range(total_chunks):
            chunk = await body.read(granularity)
            await self.api.upload_chunk(url, "upload", offset, chunk)
            offset += len(chunk)

        chunk = await body.read()
        return await self.api.upload_chunk(url, "upload, finalize", offset, chunk)

GooglePhotosOAuth(client_secrets_file: str, scopes: list[PhotosScopes], token_file: str | None = None) dataclass

Load, refresh, or obtain Google OAuth credentials for Photos scopes.

Attributes:

Name Type Description
client_secrets_file str

Path to the Google OAuth client secrets JSON (installed app).

scopes list[PhotosScopes]

API scopes to request; values are sent to the authorization server.

token_file str | None

Optional path to persist the authorized user token; parent dirs are created on save. If omitted, tokens are only kept in memory.

authorize_interactive() -> Credentials

Run the local-server OAuth flow and optionally persist the token.

Returns:

Type Description
Credentials

Newly authorized credentials.

Source code in q2google/gphotos/auth.py
def authorize_interactive(self) -> Credentials:
    """Run the local-server OAuth flow and optionally persist the token.

    Returns:
        Newly authorized credentials.
    """
    flow = InstalledAppFlow.from_client_secrets_file(
        self.client_secrets_file,
        scopes=[s.value for s in self.scopes],
    )
    creds = flow.run_local_server(open_browser=False)
    if self.token_file is not None:
        self._save_token(creds)
    return creds

ensure_credentials() -> Credentials

Return stored valid credentials, or complete an interactive authorization.

Returns:

Type Description
Credentials

Credentials that are valid for API calls (refreshed from disk when possible).

Raises:

Type Description
Various ``google_auth`` exceptions

If the OAuth flow or refresh fails in a way not handled by :meth:load_saved_credentials.

Source code in q2google/gphotos/auth.py
def ensure_credentials(self) -> Credentials:
    """Return stored valid credentials, or complete an interactive authorization.

    Returns:
        Credentials that are valid for API calls (refreshed from disk when possible).

    Raises:
        Various ``google_auth`` exceptions: If the OAuth flow or refresh fails in a way not
            handled by :meth:`load_saved_credentials`.
    """
    saved = self.load_saved_credentials()
    if saved is not None and saved.valid:
        return saved
    return self.authorize_interactive()

load_saved_credentials() -> Credentials | None

Load credentials from disk and refresh if expired.

Returns:

Type Description
Credentials | None

Valid or refreshed credentials, or None if there is no token file,

Credentials | None

if RefreshError occurs during refresh (the token file is then removed),

Credentials | None

or if stored credentials cannot be used.

Source code in q2google/gphotos/auth.py
def load_saved_credentials(self) -> Credentials | None:
    """Load credentials from disk and refresh if expired.

    Returns:
        Valid or refreshed credentials, or ``None`` if there is no token file,
        if ``RefreshError`` occurs during refresh (the token file is then removed),
        or if stored credentials cannot be used.
    """
    if self.token_file is None or not os.path.isfile(self.token_file):
        return None

    creds = Credentials.from_authorized_user_file(self.token_file)
    if creds.expired and creds.refresh_token:
        try:
            creds.refresh(Request())
            self._save_token(creds)
        except RefreshError:
            try:
                os.unlink(self.token_file)
            except OSError:
                pass
            return None
    return creds

MediaItemBatchCreateRequest

Bases: BaseModel

Request body for mediaItems:batchCreate.

Examples:

>>> MediaItemBatchCreateRequest(
...     newMediaItems=[NewMediaItem(simpleMediaItem=SimpleMediaItem(uploadToken="t"))],
... )

albumId: str | None = None class-attribute instance-attribute

If set, add created items to this album id.

newMediaItems: list[NewMediaItem] instance-attribute

Items to create from prior upload tokens.

MediaItemBatchCreateResponse

Bases: BaseModel

Response body from mediaItems:batchCreate.

Examples:

>>> MediaItemBatchCreateResponse(newMediaItemResults=[])

newMediaItemResults: list[NewMediaItemResult] instance-attribute

Parallel list of results aligned with the request newMediaItems order.

NewMediaItem

Bases: BaseModel

One new media item entry inside a batch create request.

Examples:

>>> NewMediaItem(simpleMediaItem=SimpleMediaItem(uploadToken="tok"))

description: str | None = None class-attribute instance-attribute

Optional description stored with the media item.

simpleMediaItem: SimpleMediaItem instance-attribute

Upload token and optional filename for this item.

PhotosScopes

Bases: str, Enum

OAuth 2.0 scope URLs accepted by the Google Photos Library API.

Examples:

>>> PhotosScopes.LIBRARY_APPENDONLY

LIBRARY_APPENDONLY = 'https://www.googleapis.com/auth/photoslibrary.appendonly' class-attribute instance-attribute

Create albums and upload media; cannot read or delete unrelated library content.

LIBRARY_EDIT_APP_CREATED = 'https://www.googleapis.com/auth/photoslibrary.edit.appcreateddata' class-attribute instance-attribute

Edit and delete media and albums created by this app.

LIBRARY_READONLY_APP_CREATED = 'https://www.googleapis.com/auth/photoslibrary.readonly.appcreateddata' class-attribute instance-attribute

Read media and albums created by this app only.

ResumableUploadSession

Bases: BaseModel

State parsed from Google resumable upload response headers (and optional body token).

Examples:

>>> ResumableUploadSession.model_validate(
...     {
...         "X-Goog-Upload-Status": "active",
...         "X-GUploader-UploadID": "upload-id",
...         "Date": "Mon, 01 Jan 2024 00:00:00 GMT",
...     }
... )

date: datetime = Field(alias='Date') class-attribute instance-attribute

Response Date header value, parsed to UTC-aware naive or as returned.

granularity: int | None = Field(alias='X-Goog-Upload-Chunk-Granularity', default=None) class-attribute instance-attribute

Preferred chunk size in bytes, if the server advertises one.

status: str = Field(alias='X-Goog-Upload-Status') class-attribute instance-attribute

Upload lifecycle status string from X-Goog-Upload-Status.

upload_id: str = Field(alias='X-GUploader-UploadID') class-attribute instance-attribute

Server upload identifier from X-GUploader-UploadID.

upload_token: str | None = None class-attribute instance-attribute

Raw upload token from the response body when the upload is complete.

upload_url: str | None = Field(alias='X-Goog-Upload-URL', default=None) class-attribute instance-attribute

URL to POST the next chunk or query, when provided by the server.

parse_date(v: str) -> datetime

Parse RFC 2822-style date strings from upload responses.

Parameters:

Name Type Description Default
v str

Date header value as returned by Google.

required

Returns:

Type Description
datetime

Parsed datetime.

Source code in q2google/gphotos/models.py
@field_validator("date", mode="before")
def parse_date(cls, v: str) -> datetime:
    """Parse RFC 2822-style date strings from upload responses.

    Args:
        v: Date header value as returned by Google.

    Returns:
        Parsed datetime.
    """
    return datetime.strptime(v, "%a, %d %b %Y %H:%M:%S %Z")

SimpleMediaItem

Bases: BaseModel

Minimal media reference for batch create (upload token plus optional filename).

Examples:

>>> SimpleMediaItem(uploadToken="token", fileName="photo.jpg")

fileName: str | None = None class-attribute instance-attribute

Suggested filename for the created item.

uploadToken: str instance-attribute

Upload token obtained after a successful resumable upload.