"""
Wrappers around Google APIs.
"""
import json
import os
from base64 import b64encode
from collections import OrderedDict
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
try:
# this is only an issue with Python 2.7 and if the
# Google-API packages were not installed with msl-io
from enum import Enum
except ImportError:
Enum = object
# having the Google-API packages are optional
try:
from google.auth.exceptions import RefreshError
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import DEFAULT_CHUNK_SIZE
from googleapiclient.http import MediaFileUpload
from googleapiclient.http import MediaIoBaseDownload
HAS_GOOGLE_API = True
except ImportError:
DEFAULT_CHUNK_SIZE = 100 * 1024 * 1024
HAS_GOOGLE_API = False
from .constants import HOME_DIR
from .constants import IS_PYTHON2
def _authenticate(token, client_secrets_file, scopes):
"""Authenticate with a Google API.
Parameters
----------
token : :class:`str`
The path to a token file. If it does not exist then it will be created.
client_secrets_file : :class:`str`
The "client secrets" file to use to generate the OAuth credentials.
scopes : :class:`list` of :class:`str`
The list of scopes to enable.
Returns
-------
:class:`google.oauth2.credentials.Credentials`
The OAuth 2.0 credentials for the user.
"""
if not HAS_GOOGLE_API:
raise RuntimeError(
'You must install the Google-API packages, run\n'
' pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib'
)
credentials = None
# load the token from an environment variable if it exists
# ignore the '.json' extension
token_env_name = os.path.basename(token)[:-5].replace('-', '_').upper()
if token_env_name in os.environ:
info = json.loads(os.environ[token_env_name])
credentials = Credentials.from_authorized_user_info(info, scopes=scopes)
# load the cached token file if it exists
if not credentials and os.path.isfile(token):
credentials = Credentials.from_authorized_user_file(token, scopes=scopes)
# if there are no (valid) credentials available then let the user log in
if not credentials or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
try:
credentials.refresh(Request())
except RefreshError as err:
if os.path.isfile(token) and not os.getenv('MSL_IO_RUNNING_TESTS'):
message = '{}: {}\nDo you want to delete the token file and re-authenticate ' \
'(y/N)? '.format(err.__class__.__name__, err.args[0])
if IS_PYTHON2:
yes_no = raw_input(message)
else:
yes_no = input(message)
if yes_no.lower().startswith('y'):
os.remove(token)
return _authenticate(token, client_secrets_file, scopes)
raise
else:
if not client_secrets_file:
raise OSError('You must specify the path to a "client secrets" file as the credentials')
flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, scopes)
credentials = flow.run_local_server(port=0)
# save the credentials for the next run
if token_env_name in os.environ:
os.environ[token_env_name] = credentials.to_json()
else:
# make sure that all parent directories exist before creating the file
dirname = os.path.dirname(token)
if dirname and not os.path.isdir(dirname):
os.makedirs(dirname)
with open(token, mode='wt') as fp:
fp.write(credentials.to_json())
return credentials
[docs]class GoogleAPI(object):
def __init__(self, service, version, credentials, scopes, read_only, account):
"""Base class for all Google APIs."""
name = '{}-'.format(account) if account else ''
readonly = '-readonly' if read_only else ''
filename = '{}token-{}{}.json'.format(name, service, readonly)
token = os.path.join(HOME_DIR, filename)
oauth = _authenticate(token, credentials, scopes)
self._service = build(service, version, credentials=oauth)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def service(self):
"""The Resource object with methods for interacting with the API service."""
return self._service
[docs] def close(self):
"""Close the connection to the API service."""
self._service.close()
[docs]class GDrive(GoogleAPI):
MIME_TYPE_FOLDER = 'application/vnd.google-apps.folder'
ROOT_NAMES = ['Google Drive', 'My Drive', 'Drive']
def __init__(self, account=None, credentials=None, read_only=True, scopes=None):
"""Interact with Google Drive.
.. attention::
You must follow the instructions in the prerequisites section for setting up the
`Drive API <https://developers.google.com/drive/api/quickstart/python#prerequisites>`_
before you can use this class. It is also useful to be aware of the
`refresh token expiration <https://developers.google.com/identity/protocols/oauth2#expiration>`_
policy.
.. _Media type: https://www.iana.org/assignments/media-types/media-types.xhtml
.. _Drive MIME type: https://developers.google.com/drive/api/guides/mime-types
Parameters
----------
account : :class:`str`, optional
Since a person may have multiple Google accounts, and multiple people
may run the same code, this parameter decides which token to load
to authenticate with the Google API. The value can be any text (or
:data:`None`) that you want to associate with a particular Google
account, provided that it contains valid characters for a filename.
The value that you chose when you authenticated with your `credentials`
should be used for all future instances of this class to access that
particular Google account. You can associate a different value with
a Google account at any time (by passing in a different `account`
value), but you will be asked to authenticate with your `credentials`
again, or, alternatively, you can rename the token files located in
:const:`~msl.io.constants.HOME_DIR` to match the new `account` value.
credentials : :class:`str`, optional
The path to the `client secrets` OAuth credential file. This
parameter only needs to be specified the first time that you
authenticate with a particular Google account or if you delete
the token file that was created when you previously authenticated.
read_only : :class:`bool`, optional
Whether to interact with Google Drive in read-only mode.
scopes : :class:`list` of :class:`str`, optional
The list of scopes to enable for the Google API. See
`Drive scopes <https://developers.google.com/identity/protocols/oauth2/scopes#drive>`_
for more details. If not specified then default scopes are chosen
based on the value of `read_only`.
"""
if not scopes:
if read_only:
scopes = [
'https://www.googleapis.com/auth/drive.readonly',
'https://www.googleapis.com/auth/drive.metadata.readonly'
]
else:
scopes = [
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/drive.metadata',
]
super(GDrive, self).__init__(
'drive', 'v3', credentials, scopes, read_only, account)
self._files = self._service.files()
self._drives = self._service.drives()
@staticmethod
def _folder_hierarchy(folder):
# create a list of sub-folder names in the folder hierarchy
f = folder
names = []
while True:
f, name = os.path.split(f)
if not name or name in GDrive.ROOT_NAMES:
break
names.append(name)
return names[::-1]
[docs] def folder_id(self, folder, parent_id=None):
"""Get the ID of a Google Drive folder.
Parameters
----------
folder : :class:`str`
The path to a Google Drive file.
parent_id : :class:`str`, optional
The ID of the parent folder that `folder` is relative to. If not
specified then `folder` is relative to the `My Drive` root folder.
If `folder` is in a `Shared drive` then you must specify the
ID of a parent folder.
Returns
-------
:class:`str`
The folder ID.
"""
folder_id = parent_id or 'root'
names = GDrive._folder_hierarchy(folder)
for name in names:
q = '"{}" in parents and name="{}" and trashed=false and mimeType="{}"'.format(
folder_id, name, GDrive.MIME_TYPE_FOLDER
)
response = self._files.list(
q=q,
fields='files(id,name)',
includeItemsFromAllDrives=True,
supportsAllDrives=True,
).execute()
files = response['files']
if not files:
raise OSError('Not a valid Google Drive folder {!r}'.format(folder))
if len(files) > 1:
matches = '\n '.join(str(file) for file in files)
raise OSError('Multiple folders exist for {!r}\n {}'.format(name, matches))
first = files[0]
assert name == first['name'], '{!r} != {!r}'.format(name, first['name'])
folder_id = first['id']
return folder_id
[docs] def file_id(self, file, mime_type=None, folder_id=None):
"""Get the ID of a Google Drive file.
Parameters
----------
file : :class:`str`
The path to a Google Drive file.
mime_type : :class:`str`, optional
The `Drive MIME type`_ or `Media type`_ to use to filter the results.
folder_id : :class:`str`, optional
The ID of the folder that `file` is relative to. If not specified
then `file` is relative to the `My Drive` root folder.
If `file` is in a `Shared drive` then you must specify the
ID of a parent folder.
Returns
-------
:class:`str`
The file ID.
"""
folders, name = os.path.split(file)
folder_id = self.folder_id(folders, parent_id=folder_id)
q = '"{}" in parents and name="{}" and trashed=false'.format(folder_id, name)
if not mime_type:
q += ' and mimeType!="{}"'.format(GDrive.MIME_TYPE_FOLDER)
else:
q += ' and mimeType="{}"'.format(mime_type)
response = self._files.list(
q=q,
fields='files(id,name,mimeType)',
includeItemsFromAllDrives=True,
supportsAllDrives=True,
).execute()
files = response['files']
if not files:
raise OSError('Not a valid Google Drive file {!r}'.format(file))
if len(files) > 1:
mime_types = '\n '.join(f['mimeType'] for f in files)
raise OSError('Multiple files exist for {!r}. '
'Filter by MIME type:\n {}'.format(file, mime_types))
first = files[0]
assert name == first['name'], '{!r} != {!r}'.format(name, first['name'])
return first['id']
[docs] def is_file(self, file, mime_type=None, folder_id=None):
"""Check if a file exists.
Parameters
----------
file : :class:`str`
The path to a Google Drive file.
mime_type : :class:`str`, optional
The `Drive MIME type`_ or `Media type`_ to use to filter the results.
folder_id : :class:`str`, optional
The ID of the folder that `file` is relative to. If not specified
then `file` is relative to the `My Drive` root folder.
If `file` is in a `Shared drive` then you must specify the
ID of a parent folder.
Returns
-------
:class:`bool`
Whether the file exists.
"""
try:
self.file_id(file, mime_type=mime_type, folder_id=folder_id)
except OSError as err:
return str(err).startswith('Multiple files')
else:
return True
[docs] def is_folder(self, folder, parent_id=None):
"""Check if a folder exists.
Parameters
----------
folder : :class:`str`
The path to a Google Drive folder.
parent_id : :class:`str`, optional
The ID of the parent folder that `folder` is relative to. If not
specified then `folder` is relative to the `My Drive` root folder.
If `folder` is in a `Shared drive` then you must specify the
ID of a parent folder.
Returns
-------
:class:`bool`
Whether the folder exists.
"""
try:
self.folder_id(folder, parent_id=parent_id)
except OSError as err:
return str(err).startswith('Multiple folders')
else:
return True
[docs] def create_folder(self, folder, parent_id=None):
"""Create a folder.
Makes all intermediate-level folders needed to contain the leaf directory.
Parameters
----------
folder : :class:`str`
The folder(s) to create, for example, ``'folder1'`` or
``'folder1/folder2/folder3'``.
parent_id : :class:`str`, optional
The ID of the parent folder that `folder` is relative to. If not
specified then `folder` is relative to the `My Drive` root folder.
If `folder` is in a `Shared drive` then you must specify the
ID of a parent folder.
Returns
-------
:class:`str`
The ID of the last (right most) folder that was created.
"""
names = GDrive._folder_hierarchy(folder)
response = {'id': parent_id or 'root'}
for name in names:
request = self._files.create(
body={
'name': name,
'mimeType': GDrive.MIME_TYPE_FOLDER,
'parents': [response['id']],
},
fields='id',
supportsAllDrives=True,
)
response = request.execute()
return response['id']
[docs] def delete(self, file_or_folder_id):
"""Delete a file or a folder.
Files that are in read-only mode cannot be deleted.
.. danger::
Permanently deletes the file or folder owned by the user without
moving it to the trash. If the target is a folder, then all files
and sub-folders contained within the folder (that are owned by the
user) are also permanently deleted.
Parameters
----------
file_or_folder_id : :class:`str`
The ID of the file or folder to delete.
"""
if self.is_read_only(file_or_folder_id):
# The API allows for a file to be deleted if it is in read-only mode,
# but we will not allow it to be deleted
raise RuntimeError('Cannot delete the file since it is in read-only mode')
self._files.delete(
fileId=file_or_folder_id,
supportsAllDrives=True,
).execute()
[docs] def empty_trash(self):
"""Permanently delete all files in the trash."""
self._files.emptyTrash().execute()
[docs] def upload(self, file, folder_id=None, mime_type=None, resumable=False, chunk_size=DEFAULT_CHUNK_SIZE):
"""Upload a file.
Parameters
----------
file : :class:`str`
The file to upload.
folder_id : :class:`str`, optional
The ID of the folder to upload the file to. If not specified then
uploads to the `My Drive` root folder.
mime_type : :class:`str`, optional
The `Drive MIME type`_ or `Media type`_ of the file
(e.g., ``'text/csv'``). If not specified then a type will be
guessed based on the file extension.
resumable : :class:`bool`
Whether the upload can be resumed.
chunk_size : :class:`int`
The file will be uploaded in chunks of this many bytes. Only used
if `resumable` is :data:`True`. Pass in a value of -1 if the file
is to be uploaded in a single chunk. Note that Google App Engine
has a 5MB limit on request size, so you should never set
`chunk_size` to be >5MB or to -1 (if the file size is >5MB).
Returns
-------
:class:`str`
The ID of the file that was uploaded.
"""
parent_id = folder_id or 'root'
filename = os.path.basename(file)
body = {'name': filename, 'parents': [parent_id]}
if mime_type:
body['mimeType'] = mime_type
request = self._files.create(
body=body,
media_body=MediaFileUpload(
file,
mimetype=mime_type,
chunksize=chunk_size,
resumable=resumable
),
fields='id',
supportsAllDrives=True,
)
response = request.execute()
return response['id']
[docs] def download(self, file_id, save_to=None, num_retries=0, chunk_size=DEFAULT_CHUNK_SIZE, callback=None):
"""Download a file.
Parameters
----------
file_id : :class:`str`
The ID of the file to download.
save_to : :term:`path-like <path-like object>` or :term:`file-like <file object>`, optional
The location to save the file to. If a directory is specified
then the file will be saved to that directory using the filename
of the remote file. To save the file with a new filename, specify
the new filename in `save_to`. Default is to save the file to the
current working directory using the remote filename.
num_retries : :class:`int`, optional
The number of times to retry the download.
If zero (default) then attempt the request only once.
chunk_size : :class:`int`, optional
The file will be downloaded in chunks of this many bytes.
callback
The callback to call after each chunk of the file is downloaded.
The `callback` accepts one positional argument, for example::
def handler(file):
print(file.progress(), file.total_size, file.resumable_progress)
drive.download('0Bwab3C2ejYSdM190b2psXy1C50P', callback=handler)
"""
if hasattr(save_to, 'write'):
fh = save_to
else:
if not save_to or os.path.isdir(save_to):
response = self._files.get(
fileId=file_id,
fields='name',
supportsAllDrives=True,
).execute()
name = response['name']
if save_to and os.path.isdir(save_to):
save_to = os.path.join(save_to, name)
else:
save_to = name
fh = open(save_to, mode='wb')
request = self._files.get_media(fileId=file_id, supportsAllDrives=True)
downloader = MediaIoBaseDownload(fh, request, chunksize=chunk_size)
done = False
while not done:
status, done = downloader.next_chunk(num_retries=num_retries)
if callback:
callback(status)
if fh is not save_to: # then close the file that was opened
fh.close()
[docs] def path(self, file_or_folder_id):
"""Convert an ID to a path.
Parameters
----------
file_or_folder_id : :class:`str`
The ID of a file or folder.
Returns
-------
:class:`str`
The corresponding path of the ID.
"""
names = []
while True:
request = self._files.get(
fileId=file_or_folder_id,
fields='name,parents',
supportsAllDrives=True,
)
response = request.execute()
names.append(response['name'])
parents = response.get('parents', [])
if not parents:
break
if len(parents) > 1:
raise OSError('Multiple parents exist. This case has not been handled yet. Contact developers.')
file_or_folder_id = response['parents'][0]
return '/'.join(names[::-1])
[docs] def move(self, source_id, destination_id):
"""Move a file or a folder.
When moving a file or folder between `My Drive` and a `Shared drive`
the access permissions will change.
Moving a file or folder does not change its ID, only the ID of
its `parent` changes (i.e., `source_id` will remain the same
after the move).
Parameters
----------
source_id : :class:`str`
The ID of a file or folder to move.
destination_id : :class:`str`
The ID of the destination folder. To move the file or folder to the
`My Drive` root folder then specify ``'root'`` as the `destination_id`.
"""
params = {'fileId': source_id, 'supportsAllDrives': True}
try:
self._files.update(addParents=destination_id, **params).execute()
except HttpError as e:
if 'exactly one parent' not in str(e):
raise
# Handle the following error:
# A shared drive item must have exactly one parent
response = self._files.get(fields='parents', **params).execute()
self._files.update(
addParents=destination_id,
removeParents=','.join(response['parents']),
**params).execute()
[docs] def shared_drives(self):
"""Returns the IDs and names of all `Shared drives`.
Returns
-------
:class:`dict`
The keys are the IDs of the shared drives and the values are the
names of the shared drives.
"""
drives = {}
next_page_token = ''
while True:
response = self._drives.list(pageSize=100, pageToken=next_page_token).execute()
drives.update(dict((d['id'], d['name']) for d in response['drives']))
next_page_token = response.get('nextPageToken')
if not next_page_token:
break
return drives
[docs] def copy(self, file_id, folder_id=None, name=None):
"""Copy a file.
Parameters
----------
file_id : :class:`str`
The ID of a file to copy. Folders cannot be copied.
folder_id : :class:`str`, optional
The ID of the destination folder. If not specified then creates
a copy in the same folder that the original file is located in.
To copy the file to the `My Drive` root folder then specify
``'root'`` as the `folder_id`.
name : :class:`str`, optional
The filename of the destination file.
Returns
-------
:class:`str`
The ID of the destination file.
"""
response = self._files.copy(
fileId=file_id,
fields='id',
supportsAllDrives=True,
body={
'name': name,
'parents': [folder_id] if folder_id else None,
},
).execute()
return response['id']
[docs] def rename(self, file_or_folder_id, new_name):
"""Rename a file or folder.
Renaming a file or folder does not change its ID.
Parameters
----------
file_or_folder_id : :class:`str`
The ID of a file or folder.
new_name : :class:`str`
The new name of the file or folder.
"""
self._files.update(
fileId=file_or_folder_id,
supportsAllDrives=True,
body={'name': new_name},
).execute()
[docs] def read_only(self, file_id, read_only, reason=''):
"""Set a file to be in read-only mode.
Parameters
----------
file_id : :class:`str`
The ID of a file.
read_only : :class:`bool`
Whether to set the file to be in read-only mode.
reason : :class:`str`, optional
The reason for putting the file in read-only mode.
Only used if `read_only` is :data:`True`.
"""
restrictions = {'readOnly': read_only}
if read_only:
restrictions['reason'] = reason
# If `file_id` is already in read-only mode, and it is being set
# to read-only mode then the API raises a TimeoutError waiting for
# a response. To avoid this error, check the mode and if it is
# already in read-only mode we are done.
if self.is_read_only(file_id):
return
self._files.update(
fileId=file_id,
supportsAllDrives=True,
body={'contentRestrictions': [restrictions]}
).execute()
[docs] def is_read_only(self, file_id):
"""Returns whether the file is in read-only mode.
Parameters
----------
file_id : :class:`str`
The ID of a file.
Returns
-------
:class:`bool`
Whether the file is in read-only mode.
"""
response = self._files.get(
fileId=file_id,
supportsAllDrives=True,
fields='contentRestrictions',
).execute()
restrictions = response.get('contentRestrictions')
if not restrictions:
return False
return restrictions[0]['readOnly']
[docs]class GValueOption(Enum):
"""Determines how values should be returned."""
FORMATTED = 'FORMATTED_VALUE'
"""Values will be calculated and formatted in the reply according to the
cell's formatting. Formatting is based on the spreadsheet's locale, not
the requesting user's locale. For example, if A1 is 1.23 and A2 is =A1
and formatted as currency, then A2 would return "$1.23"."""
UNFORMATTED = 'UNFORMATTED_VALUE'
"""Values will be calculated, but not formatted in the reply.
For example, if A1 is 1.23 and A2 is =A1 and formatted as currency, then
A2 would return the number 1.23."""
FORMULA = 'FORMULA'
"""Values will not be calculated. The reply will include the formulas.
For example, if A1 is 1.23 and A2 is =A1 and formatted as currency,
then A2 would return "=A1"."""
[docs]class GDateTimeOption(Enum):
"""Determines how dates should be returned."""
SERIAL_NUMBER = 'SERIAL_NUMBER'
"""Instructs date, time, datetime, and duration fields to be output as
doubles in "serial number" format, as popularized by Lotus 1-2-3. The
whole number portion of the value (left of the decimal) counts the days
since December 30th 1899. The fractional portion (right of the decimal)
counts the time as a fraction of the day. For example, January 1st 1900
at noon would be 2.5, 2 because it's 2 days after December 30st 1899,
and .5 because noon is half a day. February 1st 1900 at 3pm would be
33.625. This correctly treats the year 1900 as not a leap year."""
FORMATTED_STRING = 'FORMATTED_STRING'
"""Instructs date, time, datetime, and duration fields to be output as
strings in their given number format (which is dependent on the
spreadsheet locale)."""
[docs]class GCellType(Enum):
"""The spreadsheet cell data type."""
BOOLEAN = 'BOOLEAN'
CURRENCY = 'CURRENCY'
DATE = 'DATE'
DATE_TIME = 'DATE_TIME'
EMPTY = 'EMPTY'
ERROR = 'ERROR'
NUMBER = 'NUMBER'
PERCENT = 'PERCENT'
SCIENTIFIC = 'SCIENTIFIC'
STRING = 'STRING'
TEXT = 'TEXT'
TIME = 'TIME'
UNKNOWN = 'UNKNOWN'
GCell = namedtuple('GCell', ('value', 'type', 'formatted'))
"""The information about a Google Sheets cell.
.. attribute:: value
The value of the cell.
.. attribute:: type
:class:`GCellType`: The data type of `value`.
.. attribute:: formatted
:class:`str`: The formatted value (i.e., how the `value` is displayed in the cell).
"""
[docs]class GSheets(GoogleAPI):
MIME_TYPE = 'application/vnd.google-apps.spreadsheet'
SERIAL_NUMBER_ORIGIN = datetime(1899, 12, 30)
def __init__(self, account=None, credentials=None, read_only=True, scopes=None):
"""Interact with Google Sheets.
.. attention::
You must follow the instructions in the prerequisites section for setting up the
`Sheets API <https://developers.google.com/sheets/api/quickstart/python#prerequisites>`_
before you can use this class. It is also useful to be aware of the
`refresh token expiration <https://developers.google.com/identity/protocols/oauth2#expiration>`_
policy.
Parameters
----------
account : :class:`str`, optional
Since a person may have multiple Google accounts, and multiple people
may run the same code, this parameter decides which token to load
to authenticate with the Google API. The value can be any text (or
:data:`None`) that you want to associate with a particular Google
account, provided that it contains valid characters for a filename.
The value that you chose when you authenticated with your `credentials`
should be used for all future instances of this class to access that
particular Google account. You can associate a different value with
a Google account at any time (by passing in a different `account`
value), but you will be asked to authenticate with your `credentials`
again, or, alternatively, you can rename the token files located in
:const:`~msl.io.constants.HOME_DIR` to match the new `account` value.
credentials : :class:`str`, optional
The path to the `client secrets` OAuth credential file. This
parameter only needs to be specified the first time that you
authenticate with a particular Google account or if you delete
the token file that was created when you previously authenticated.
read_only : :class:`bool`, optional
Whether to interact with Google Sheets in read-only mode.
scopes : :class:`list` of :class:`str`, optional
The list of scopes to enable for the Google API. See
`Sheets scopes <https://developers.google.com/identity/protocols/oauth2/scopes#sheets>`_
for more details. If not specified then default scopes are chosen
based on the value of `read_only`.
"""
if not scopes:
if read_only:
scopes = ['https://www.googleapis.com/auth/spreadsheets.readonly']
else:
scopes = ['https://www.googleapis.com/auth/spreadsheets']
super(GSheets, self).__init__(
'sheets', 'v4', credentials, scopes, read_only, account)
self._spreadsheets = self._service.spreadsheets()
[docs] def append(self, values, spreadsheet_id, cell=None, sheet=None, row_major=True, raw=False):
"""Append values to a sheet.
Returns
-------
values
The value(s) to append
spreadsheet_id : :class:`str`
The ID of a Google Sheets file.
cell : :class:`str`, optional
The cell (top-left corner) to start appending the values to. If the
cell already contains data then new rows are inserted and the values
are written to the new rows. For example, ``'D100'``.
sheet : :class:`str`, optional
The name of a sheet in the spreadsheet to append the values to.
If not specified and only one sheet exists in the spreadsheet
then automatically determines the sheet name; however, it is
more efficient to specify the name of the sheet.
row_major : :class:`bool`, optional
Whether to append the values in row-major or column-major order.
raw : :class:`bool`, optional
Determines how the values should be interpreted. If :data:`True`,
the values will not be parsed and will be stored as-is. If
:data:`False`, the values will be parsed as if the user typed
them into the UI. Numbers will stay as numbers, but strings may
be converted to numbers, dates, etc. following the same rules
that are applied when entering text into a cell via the Google
Sheets UI.
"""
self._spreadsheets.values().append(
spreadsheetId=spreadsheet_id,
range=self._get_range(sheet, cell, spreadsheet_id),
valueInputOption='RAW' if raw else 'USER_ENTERED',
insertDataOption='INSERT_ROWS',
body={
'values': self._values(values),
'majorDimension': 'ROWS' if row_major else 'COLUMNS',
},
).execute()
[docs] def write(self, values, spreadsheet_id, cell, sheet=None, row_major=True, raw=False):
"""Write values to a sheet.
If a cell that is being written to already contains a value,
the value in that cell is overwritten with the new value.
Returns
-------
values
The value(s) to write.
spreadsheet_id : :class:`str`
The ID of a Google Sheets file.
cell : :class:`str`, optional
The cell (top-left corner) to start writing the values to.
For example, ``'C9'``.
sheet : :class:`str`, optional
The name of a sheet in the spreadsheet to write the values to.
If not specified and only one sheet exists in the spreadsheet
then automatically determines the sheet name; however, it is
more efficient to specify the name of the sheet.
row_major : :class:`bool`, optional
Whether to write the values in row-major or column-major order.
raw : :class:`bool`, optional
Determines how the values should be interpreted. If :data:`True`,
the values will not be parsed and will be stored as-is. If
:data:`False`, the values will be parsed as if the user typed
them into the UI. Numbers will stay as numbers, but strings may
be converted to numbers, dates, etc. following the same rules
that are applied when entering text into a cell via the Google
Sheets UI.
"""
self._spreadsheets.values().update(
spreadsheetId=spreadsheet_id,
range=self._get_range(sheet, cell, spreadsheet_id),
valueInputOption='RAW' if raw else 'USER_ENTERED',
body={
'values': self._values(values),
'majorDimension': 'ROWS' if row_major else 'COLUMNS',
},
).execute()
[docs] def copy(self, name_or_id, spreadsheet_id, destination_spreadsheet_id):
"""Copy a sheet from one spreadsheet to another spreadsheet.
Parameters
----------
name_or_id : :class:`str` or :class:`int`
The name or ID of the sheet to copy.
spreadsheet_id : :class:`str`
The ID of the spreadsheet that contains the sheet.
destination_spreadsheet_id : :class:`str`
The ID of a spreadsheet to copy the sheet to.
Returns
-------
:class:`int`
The ID of the sheet in the destination spreadsheet.
"""
if isinstance(name_or_id, int):
sheet_id = name_or_id
else:
sheet_id = self.sheet_id(name_or_id, spreadsheet_id)
response = self._spreadsheets.sheets().copyTo(
spreadsheetId=spreadsheet_id,
sheetId=sheet_id,
body={
'destination_spreadsheet_id': destination_spreadsheet_id,
},
).execute()
return response['sheetId']
[docs] def sheet_id(self, name, spreadsheet_id):
"""Returns the ID of a sheet.
Parameters
----------
name : :class:`str`
The name of the sheet.
spreadsheet_id : :class:`str`
The ID of the spreadsheet.
Returns
-------
:class:`int`
The ID of the sheet.
"""
request = self._spreadsheets.get(spreadsheetId=spreadsheet_id)
response = request.execute()
for sheet in response['sheets']:
if sheet['properties']['title'] == name:
return sheet['properties']['sheetId']
raise ValueError('There is no sheet named {!r}'.format(name))
[docs] def rename_sheet(self, name_or_id, new_name, spreadsheet_id):
"""Rename a sheet.
Parameters
----------
name_or_id : :class:`str` or :class:`int`
The name or ID of the sheet to rename.
new_name : :class:`str`
The new name of the sheet.
spreadsheet_id : :class:`str`
The ID of the spreadsheet that contains the sheet.
"""
if isinstance(name_or_id, int):
sheet_id = name_or_id
else:
sheet_id = self.sheet_id(name_or_id, spreadsheet_id)
self._spreadsheets.batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': [{
'updateSheetProperties': {
'properties': {
'sheetId': sheet_id,
'title': new_name,
},
'fields': 'title',
}
}]
}
).execute()
[docs] def add_sheets(self, names, spreadsheet_id):
"""Add sheets to a spreadsheet.
Parameters
----------
names : :class:`str` or :class:`list` of :class:`str`
The name(s) of the new sheet(s) to add.
spreadsheet_id : :class:`str`
The ID of the spreadsheet to add the sheet(s) to.
Returns
-------
:class:`dict`
The keys are the IDs of the new sheets and the values are the names.
"""
if isinstance(names, str):
names = [names]
response = self._spreadsheets.batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': [{
'addSheet': {
'properties': {
'title': name
}
}
} for name in names]
}
).execute()
return OrderedDict((r['addSheet']['properties']['sheetId'],
r['addSheet']['properties']['title'])
for r in response['replies'])
[docs] def delete_sheets(self, names_or_ids, spreadsheet_id):
"""Delete sheets from a spreadsheet.
Parameters
----------
names_or_ids : :class:`str`, :class:`int` or :class:`list`
The name(s) or ID(s) of the sheet(s) to delete.
spreadsheet_id : :class:`str`
The ID of the spreadsheet to delete the sheet(s) from.
"""
if not isinstance(names_or_ids, (list, tuple)):
names_or_ids = [names_or_ids]
self._spreadsheets.batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': [{
'deleteSheet': {
'sheetId': n if isinstance(n, int) else self.sheet_id(n, spreadsheet_id)
}
} for n in names_or_ids]
}
).execute()
[docs] def create(self, name, sheet_names=None):
"""Create a new spreadsheet.
The spreadsheet will be created in the `My Drive` root folder.
To move it to a different folder use :meth:`GDrive.create_folder`
and/or :meth:`GDrive.move`.
Parameters
----------
name : :class:`str`
The name of the spreadsheet.
sheet_names : :class:`list` of :class:`str`, optional
The names of the sheets that are in the spreadsheet.
Returns
-------
:class:`str`
The ID of the spreadsheet that was created.
"""
body = {'properties': {'title': name}}
if sheet_names:
body['sheets'] = [{
'properties': {'title': sn}
} for sn in sheet_names]
response = self._spreadsheets.create(body=body).execute()
return response['spreadsheetId']
[docs] def sheet_names(self, spreadsheet_id):
"""Get the names of all sheets in a spreadsheet.
Parameters
----------
spreadsheet_id : :class:`str`
The ID of a Google Sheets file.
Returns
-------
:class:`tuple` of :class:`str`
The names of all sheets.
"""
request = self._spreadsheets.get(spreadsheetId=spreadsheet_id)
response = request.execute()
return tuple(r['properties']['title'] for r in response['sheets'])
[docs] def values(self,
spreadsheet_id,
sheet=None,
cells=None,
row_major=True,
value_option=GValueOption.FORMATTED,
datetime_option=GDateTimeOption.SERIAL_NUMBER
):
"""Return a range of values from a spreadsheet.
Parameters
----------
spreadsheet_id : :class:`str`
The ID of a Google Sheets file.
sheet : :class:`str`, optional
The name of a sheet in the spreadsheet to read the values from.
If not specified and only one sheet exists in the spreadsheet
then automatically determines the sheet name; however, it is
more efficient to specify the name of the sheet.
cells : :class:`str`, optional
The A1 notation or R1C1 notation of the range to retrieve values
from. If not specified then returns all values that are in `sheet`.
row_major : :class:`bool`, optional
Whether to return the values in row-major or column-major order.
value_option : :class:`str` or :class:`GValueOption`, optional
How values should be represented in the output. If a string
then it must be equal to one of the values in :class:`GValueOption`.
datetime_option : :class:`str` or :class:`GDateTimeOption`, optional
How dates, times, and durations should be represented in the
output. If a string then it must be equal to one of the values
in :class:`GDateTimeOption`. This argument is ignored if
`value_option` is :attr:`GValueOption.FORMATTED`.
Returns
-------
:class:`list`
The values from the sheet.
"""
if hasattr(value_option, 'value'):
value_option = value_option.value
if hasattr(datetime_option, 'value'):
datetime_option = datetime_option.value
response = self._spreadsheets.values().get(
spreadsheetId=spreadsheet_id,
range=self._get_range(sheet, cells, spreadsheet_id),
majorDimension='ROWS' if row_major else 'COLUMNS',
valueRenderOption=value_option,
dateTimeRenderOption=datetime_option
).execute()
return response.get('values', [])
[docs] def cells(self, spreadsheet_id, ranges=None):
"""Return cells from a spreadsheet.
Parameters
----------
spreadsheet_id : :class:`str`
The ID of a Google Sheets file.
ranges : :class:`str` or :class:`list` of :class:`str`, optional
The ranges to retrieve from the spreadsheet. Examples:
* ``'Sheet1'`` :math:`\\rightarrow` return all cells from
the sheet named Sheet1
* ``'Sheet1!A1:H5'`` :math:`\\rightarrow` return cells A1:H5
from the sheet named Sheet1
* ``['Sheet1!A1:H5', 'Data', 'Devices!B4:B9']`` :math:`\\rightarrow`
return cells A1:H5 from the sheet named Sheet1, all cells from the
sheet named Data and cells B4:B9 from the sheet named Devices
If not specified then return all cells from all sheets.
Returns
-------
:class:`dict`
The cells from the spreadsheet. The keys are the names of the
sheets and the values are a :class:`list` of :class:`GCell`
objects for the specified range of each sheet.
"""
response = self._spreadsheets.get(
spreadsheetId=spreadsheet_id,
includeGridData=True,
ranges=ranges,
).execute()
cells = {}
for sheet in response['sheets']:
data = []
for item in sheet['data']:
for row in item.get('rowData', []):
row_data = []
for col in row.get('values', []):
effective_value = col.get('effectiveValue', None)
formatted = col.get('formattedValue', '')
if effective_value is None:
value = None
typ = GCellType.EMPTY
elif 'numberValue' in effective_value:
value = effective_value['numberValue']
t = col.get('effectiveFormat', {}).get('numberFormat', {}).get('type', 'NUMBER')
try:
typ = GCellType(t)
except ValueError:
typ = GCellType.UNKNOWN
elif 'stringValue' in effective_value:
value = effective_value['stringValue']
typ = GCellType.STRING
elif 'boolValue' in effective_value:
value = effective_value['boolValue']
typ = GCellType.BOOLEAN
elif 'errorValue' in effective_value:
msg = effective_value['errorValue']['message']
value = '{} ({})'.format(col['formattedValue'], msg)
typ = GCellType.ERROR
else:
value = formatted
typ = GCellType.UNKNOWN
row_data.append(GCell(value=value, type=typ, formatted=formatted))
data.append(row_data)
cells[sheet['properties']['title']] = data
return cells
[docs] @staticmethod
def to_datetime(value):
"""Convert a "serial number" date into a :class:`datetime.datetime`.
Parameters
----------
value : :class:`float`
A date in the "serial number" format.
Returns
-------
:class:`datetime.datetime`
The date converted.
"""
days = int(value)
seconds = (value - days) * 86400 # 60 * 60 * 24
return GSheets.SERIAL_NUMBER_ORIGIN + timedelta(days=days, seconds=seconds)
def _get_range(self, sheet, cells, spreadsheet_id):
if not sheet:
names = self.sheet_names(spreadsheet_id)
if len(names) != 1:
sheets = ', '.join(repr(n) for n in names)
raise ValueError('You must specify a sheet name: ' + sheets)
_range = names[0]
else:
_range = sheet
if cells:
_range += '!{}'.format(cells)
return _range
@staticmethod
def _values(values):
"""The append() and update() API methods require a list of lists."""
if not isinstance(values, (list, tuple)):
return [[values]]
if values and not isinstance(values[0], (list, tuple)):
return [values]
return values
[docs]class GMail(GoogleAPI):
def __init__(self, account=None, credentials=None, scopes=None):
"""Interact with Gmail.
.. attention::
You must follow the instructions in the prerequisites section for setting up the
`Gmail API <https://developers.google.com/gmail/api/quickstart/python#prerequisites>`_
before you can use this class. It is also useful to be aware of the
`refresh token expiration <https://developers.google.com/identity/protocols/oauth2#expiration>`_
policy.
Parameters
----------
account : :class:`str`, optional
Since a person may have multiple Google accounts, and multiple people
may run the same code, this parameter decides which token to load
to authenticate with the Google API. The value can be any text (or
:data:`None`) that you want to associate with a particular Google
account, provided that it contains valid characters for a filename.
The value that you chose when you authenticated with your `credentials`
should be used for all future instances of this class to access that
particular Google account. You can associate a different value with
a Google account at any time (by passing in a different `account`
value), but you will be asked to authenticate with your `credentials`
again, or, alternatively, you can rename the token files located in
:const:`~msl.io.constants.HOME_DIR` to match the new `account` value.
credentials : :class:`str`, optional
The path to the `client secrets` OAuth credential file. This
parameter only needs to be specified the first time that you
authenticate with a particular Google account or if you delete
the token file that was created when you previously authenticated.
scopes : :class:`list` of :class:`str`, optional
The list of scopes to enable for the Google API. See
`Gmail scopes <https://developers.google.com/identity/protocols/oauth2/scopes#gmail>`_
for more details. If not specified then default scopes are chosen.
"""
if not scopes:
scopes = [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.metadata'
]
super(GMail, self).__init__(
'gmail', 'v1', credentials, scopes, False, account)
self._my_email_address = None
self._users = self._service.users()
[docs] def profile(self):
"""Gets the authenticated user's Gmail profile.
Returns
-------
:class:`dict`
Returns the following
.. code-block:: console
{
'email_address': string, The authenticated user's email address
'messages_total': integer, The total number of messages in the mailbox
'threads_total': integer, The total number of threads in the mailbox
'history_id': string, The ID of the mailbox's current history record
}
"""
profile = self._users.getProfile(userId='me').execute()
return {
'email_address': profile['emailAddress'],
'messages_total': profile['messagesTotal'],
'threads_total': profile['threadsTotal'],
'history_id': profile['historyId'],
}
[docs] def send(self, recipients, sender='me', subject=None, body=None):
"""Send an email.
Parameters
----------
recipients : :class:`str` or :class:`list` of :class:`str`
The email address(es) of the recipient(s). The value ``'me'``
can be used to indicate the authenticated user.
sender : :class:`str`, optional
The email address of the sender. The value ``'me'``
can be used to indicate the authenticated user.
subject : :class:`str`, optional
The text to include in the subject field.
body : :class:`str`, optional
The text to include in the body of the email. The text can be
enclosed in ``<html></html>`` tags to use HTML elements to format
the message.
See Also
--------
:func:`~msl.io.utils.send_email`
"""
if isinstance(recipients, str):
recipients = [recipients]
for i in range(len(recipients)):
if recipients[i] == 'me':
if self._my_email_address is None:
self._my_email_address = self.profile()['email_address']
recipients[i] = self._my_email_address
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = ', '.join(recipients)
msg['Subject'] = subject or '(no subject)'
text = body or ''
subtype = 'html' if text.startswith('<html>') else 'plain'
msg.attach(MIMEText(text, subtype))
self._users.messages().send(
userId=sender,
body={'raw': b64encode(msg.as_bytes()).decode()}
).execute()