from datetime import date, datetime
from functools import cached_property, partialmethod
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
List,
Literal,
Optional,
Sequence,
Union,
)
import pydantic
from typing_extensions import Self
from pyairtable.exceptions import InvalidParameterError, MissingRecordError
from pyairtable.models._base import AirtableModel, rebuild_models
from pyairtable.models.audit import AuditLogResponse
from pyairtable.models.schema import (
EnterpriseInfo,
NestedId,
Package,
UserGroup,
UserInfo,
)
from pyairtable.utils import (
Url,
UrlBuilder,
cache_unless_forced,
coerce_iso_str,
coerce_list_str,
enterprise_only,
)
if TYPE_CHECKING:
from pyairtable.api.api import Api
from pyairtable.api.base import Base
from pyairtable.api.workspace import Workspace
[docs]@enterprise_only
class Enterprise:
"""
Represents an Airtable enterprise account.
>>> enterprise = api.enterprise("entUBq2RGdihxl3vU")
>>> enterprise.info().workspace_ids
['wspmhESAta6clCCwF', ...]
"""
class _urls(UrlBuilder):
#: URL for retrieving basic information about the enterprise.
meta = Url("meta/enterpriseAccounts/{id}")
#: URL for retrieving information about all users.
users = meta / "users"
#: URL for retrieving information about all user groups.
groups = Url("meta/groups")
#: URL for claiming a user into an enterprise.
claim_users = meta / "claim/users"
#: URL for retrieving audit log events.
audit_log = meta / "auditLogEvents"
#: URL for managing descendant enterprise accounts.
descendants = meta / "descendants"
#: URL for moving user groups between enterprise accounts.
move_groups = meta / "moveGroups"
#: URL for moving workspaces between enterprise accounts.
move_workspaces = meta / "moveWorkspaces"
#: URL for creating a new workspace.
create_workspace = Url("meta/workspaces")
#: URL for listing enterprise packages.
packages = meta / "packages"
[docs] def package_install(self, package_id: str) -> Url:
"""
URL for installing a package (creating a base from a package).
"""
return self.meta / "packages" / package_id / "install"
[docs] def user(self, user_id: str) -> Url:
"""
URL for retrieving information about a single user.
"""
return self.users / user_id
[docs] def group(self, group_id: str) -> Url:
"""
URL for retrieving information about a single user group.
"""
return self.groups / group_id
[docs] def admin_access(self, action: Literal["grant", "revoke"]) -> Url:
"""
URL for granting or revoking admin access to one or more users.
"""
return self.meta / f"users/{action}AdminAccess"
[docs] def remove_user(self, user_id: str) -> Url:
"""
URL for removing a user from the enterprise.
"""
return self.user(user_id) / "remove"
#: URL for granting admin access to one or more users.
grant_admin = partialmethod(admin_access, "grant")
#: URL for revoking admin access from one or more users.
revoke_admin = partialmethod(admin_access, "revoke")
urls = cached_property(_urls)
[docs] def __init__(self, api: "Api", enterprise_id: str):
self.api = api
self.id = enterprise_id
self._info: Optional[EnterpriseInfo] = None
[docs] @cache_unless_forced
def info(
self,
*,
aggregated: bool = False,
descendants: bool = False,
) -> EnterpriseInfo:
"""
Retrieve basic information about the enterprise, caching the result.
Calls `Get enterprise <https://airtable.com/developers/web/api/get-enterprise>`__.
Args:
aggregated: if ``True``, include aggregated values across the enterprise.
descendants: if ``True``, include information about the enterprise's descendant orgs.
"""
include = []
if aggregated:
include.append("aggregated")
if descendants:
include.append("descendants")
params = {"include": include}
response = self.api.get(self.urls.meta, params=params)
return EnterpriseInfo.from_api(response, self.api)
[docs] def group(self, group_id: str, collaborations: bool = True) -> UserGroup:
"""
Retrieve information on a single user group with the given ID.
Args:
group_id: A user group ID (``grpQBq2RGdihxl3vU``).
collaborations: If ``False``, no collaboration data will be requested
from Airtable. This may result in faster responses.
"""
params = {"include": ["collaborations"] if collaborations else []}
payload = self.api.get(self.urls.group(group_id), params=params)
return UserGroup.model_validate(payload)
[docs] def user(
self,
id_or_email: str,
*,
collaborations: bool = True,
aggregated: bool = False,
descendants: bool = False,
) -> UserInfo:
"""
Retrieve information on a single user with the given ID or email.
Args:
id_or_email: A user ID (``usrQBq2RGdihxl3vU``) or email address.
collaborations: If ``False``, no collaboration data will be requested
from Airtable. This may result in faster responses.
aggregated: If ``True``, includes the user's aggregated values
across this enterprise account and its descendants.
descendants: If ``True``, includes information about the user
in a ``dict`` keyed per descendant enterprise account.
"""
users = self.users(
[id_or_email],
collaborations=collaborations,
aggregated=aggregated,
descendants=descendants,
)
return users[0]
[docs] def users(
self,
ids_or_emails: Iterable[str],
*,
collaborations: bool = True,
aggregated: bool = False,
descendants: bool = False,
) -> List[UserInfo]:
"""
Retrieve information on the users with the given IDs or emails.
Read more at `Get users by ID or email <https://airtable.com/developers/web/api/get-users-by-id-or-email>`__.
Args:
ids_or_emails: A sequence of user IDs (``usrQBq2RGdihxl3vU``)
or email addresses (or both).
collaborations: If ``False``, no collaboration data will be requested
from Airtable. This may result in faster responses.
aggregated: If ``True``, includes the user's aggregated values
across this enterprise account and its descendants.
descendants: If ``True``, includes information about the user
in a ``dict`` keyed per descendant enterprise account.
"""
user_ids: List[str] = []
emails: List[str] = []
for value in ids_or_emails:
(emails if "@" in value else user_ids).append(value)
include = []
if collaborations:
include.append("collaborations")
if aggregated:
include.append("aggregated")
if descendants:
include.append("descendants")
response = self.api.get(
url=self.urls.users,
params={
"id": user_ids,
"email": emails,
"include": include,
},
)
# key by user ID to avoid returning duplicates
users = {
info.id: info
for user_obj in response["users"]
if (info := UserInfo.from_api(user_obj, self.api, context=self))
}
return list(users.values())
[docs] def audit_log(
self,
*,
page_size: Optional[int] = None,
page_limit: Optional[int] = None,
sort_asc: Optional[bool] = False,
previous: Optional[str] = None,
next: Optional[str] = None,
start_time: Optional[Union[str, date, datetime]] = None,
end_time: Optional[Union[str, date, datetime]] = None,
user_id: Optional[Union[str, Iterable[str]]] = None,
event_type: Optional[Union[str, Iterable[str]]] = None,
model_id: Optional[Union[str, Iterable[str]]] = None,
category: Optional[Union[str, Iterable[str]]] = None,
) -> Iterator[AuditLogResponse]:
"""
Retrieve and yield results from the `Audit Log <https://airtable.com/developers/web/api/audit-logs-integration-guide>`__,
one page of results at a time. Each result is an instance of :class:`~pyairtable.models.audit.AuditLogResponse`
and contains the pagination IDs returned from the API, as described in the linked documentation.
By default, the Airtable API will return up to 180 days of audit log events, going backwards from most recent.
Retrieving all records may take some time, but is as straightforward as:
>>> enterprise = Enterprise("entYourEnterpriseId")
>>> events = [
... event
... for page in enterprise.audit_log()
... for event in page.events
... ]
If you are creating a record of all audit log events, you probably want to start with the earliest
events in the retention window and iterate chronologically. You'll likely have a job running
periodically in the background, so you'll need some way to persist the pagination IDs retrieved
from the API in case that job is interrupted and needs to be restarted.
The sample code below will use a local file to remember the next page's ID, so that if the job is
interrupted, it will resume where it left off (potentially processing some entries twice).
.. code-block:: python
import os
import shelve
import pyairtable
def handle_event(event):
print(event)
api = pyairtable.Api(os.environ["AIRTABLE_API_KEY"])
enterprise = api.enterprise(os.environ["AIRTABLE_ENTERPRISE_ID"])
persistence = shelve.open("audit_log.db")
first_page = persistence.get("next", None)
for page in enterprise.audit_log(sort_asc=True, next=first_page):
for event in page.events:
handle_event(event)
persistence["next"] = page.pagination.next
For more information on any of the keyword parameters below, refer to the
`audit log events <https://airtable.com/developers/web/api/audit-log-events>`__
API documentation.
Args:
page_size: How many events per page to return (maximum 100).
page_limit: How many pages to return before stopping.
sort_asc: Whether to sort in ascending order (earliest to latest)
rather than descending order (latest to earliest).
previous: Requests the previous page of results from the given ID.
See the `audit log integration guide <https://airtable.com/developers/web/api/audit-logs-integration-guide>`__
for more information on pagination parameters.
next: Requests the next page of results according to the given ID.
See the `audit log integration guide <https://airtable.com/developers/web/api/audit-logs-integration-guide>`__
for more information on pagination parameters.
start_time: Earliest timestamp to retrieve (inclusive).
end_time: Latest timestamp to retrieve (inclusive).
originating_user_id: Retrieve audit log events originating
from the provided user ID or IDs (maximum 100).
event_type: Retrieve audit log events falling under the provided
`audit log event type <https://airtable.com/developers/web/api/audit-log-event-types>`__
or types (maximum 100).
model_id: Retrieve audit log events taking action on, or involving,
the provided model ID or IDs (maximum 100).
category: Retrieve audit log events belonging to the provided
audit log event category or categories.
Returns:
An object representing a single page of audit log results.
"""
start_time = coerce_iso_str(start_time)
end_time = coerce_iso_str(end_time)
user_id = coerce_list_str(user_id)
event_type = coerce_list_str(event_type)
model_id = coerce_list_str(model_id)
category = coerce_list_str(category)
params = {
"startTime": start_time,
"endTime": end_time,
"originatingUserId": user_id,
"eventType": event_type,
"modelId": model_id,
"category": category,
"pageSize": page_size,
"sortOrder": ("ascending" if sort_asc else "descending"),
"previous": previous,
"next": next,
}
params = {k: v for (k, v) in params.items() if v}
offset_field = "next" if sort_asc else "previous"
iter_requests = self.api.iterate_requests(
method="GET",
url=self.urls.audit_log,
params=params,
offset_field=offset_field,
)
for count, response in enumerate(iter_requests, start=1):
parsed = AuditLogResponse.model_validate(response)
yield parsed
if not parsed.events:
return
if page_limit is not None and count >= page_limit:
return
[docs] def remove_user(
self,
user_id: str,
replacement: Optional[str] = None,
*,
descendants: bool = False,
) -> "UserRemoved":
"""
Unshare a user from all enterprise workspaces, bases, and interfaces.
If applicable, the user will also be removed from as an enterprise admin.
See `Remove user from enterprise <https://airtable.com/developers/web/api/remove-user-from-enterprise>`__
for more information.
Args:
user_id: The user ID.
replacement: If the user is the sole owner of any workspaces, you must
specify a replacement user ID to be added as the new owner of such
workspaces. If the user is not the sole owner of any workspaces,
this is optional and will be ignored if provided.
descendants: If ``True``, removes the user from descendant enterprise accounts.
"""
url = self.urls.remove_user(user_id)
payload: Dict[str, Any] = {"isDryRun": False}
if replacement:
payload["replacementOwnerId"] = replacement
if descendants:
payload["removeFromDescendants"] = True
response = self.api.post(url, json=payload)
return UserRemoved.from_api(response, self.api, context=self)
[docs] def claim_users(
self, users: Dict[str, Literal["managed", "unmanaged"]]
) -> "ManageUsersResponse":
"""
Batch manage organizations enterprise account users. This endpoint allows you
to change a user's membership status from being unmanaged to being an
organization member, and vice versa.
See `Manage user membership <https://airtable.com/developers/web/api/manage-user-membership>`__
for more information.
Args:
users: A ``dict`` mapping user IDs or emails to the desired state,
either ``"managed"`` or ``"unmanaged"``.
"""
payload = {
"users": [
{
("email" if "@" in key else "id"): key,
"state": value,
}
for (key, value) in users.items()
]
}
response = self.api.post(self.urls.claim_users, json=payload)
return ManageUsersResponse.from_api(response, self.api, context=self)
[docs] def delete_users(self, emails: Iterable[str]) -> "DeleteUsersResponse":
"""
Delete multiple users by email.
Args:
emails: A list or other iterable of email addresses.
"""
response = self.api.delete(self.urls.users, params={"email": list(emails)})
return DeleteUsersResponse.from_api(response, self.api, context=self)
[docs] def grant_admin(self, *users: Union[str, UserInfo]) -> "ManageUsersResponse":
"""
Grant admin access to one or more users.
Args:
users: One or more user IDs, email addresses, or instances of
:class:`~pyairtable.models.schema.UserInfo`.
"""
return self._post_admin_access("grant", users)
[docs] def revoke_admin(self, *users: Union[str, UserInfo]) -> "ManageUsersResponse":
"""
Revoke admin access to one or more users.
Args:
users: One or more user IDs, email addresses, or instances of
:class:`~pyairtable.models.schema.UserInfo`.
"""
return self._post_admin_access("revoke", users)
def _post_admin_access(
self, action: Literal["grant", "revoke"], users: Iterable[Union[str, UserInfo]]
) -> "ManageUsersResponse":
response = self.api.post(
self.urls.admin_access(action),
json={
"users": [
{"email": user_id} if "@" in user_id else {"id": user_id}
for user in users
for user_id in [user.id if isinstance(user, UserInfo) else user]
]
},
)
return ManageUsersResponse.from_api(response, self.api, context=self)
[docs] def create_descendant(self, name: str) -> Self:
"""
Creates a descendant enterprise account of the enterprise account.
Descendant enterprise accounts can only be created for root enterprise accounts
with the Enterprise Hub feature enabled.
See `Create descendant enterprise <https://airtable.com/developers/web/api/create-descendant-enterprise>`__.
Args:
name: The name to give the new account.
"""
response = self.api.post(self.urls.descendants, json={"name": name})
return self.__class__(self.api, response["id"])
[docs] def move_groups(
self,
group_ids: Iterable[str],
target: Union[str, Self],
) -> "MoveGroupsResponse":
"""
Move one or more user groups from the current enterprise account
into a different enterprise account within the same organization.
See `Move user groups <https://airtable.com/developers/web/api/move-user-groups>`__.
Args:
group_ids: User group IDs.
target: The ID of the target enterprise, or an instance of :class:`~pyairtable.Enterprise`.
"""
if isinstance(target, Enterprise):
target = target.id
response = self.api.post(
self.urls.move_groups,
json={
"groupIds": group_ids,
"targetEnterpriseAccountId": target,
},
)
return MoveGroupsResponse.from_api(response, self.api, context=self)
[docs] def move_workspaces(
self,
workspace_ids: Iterable[str],
target: Union[str, Self],
) -> "MoveWorkspacesResponse":
"""
Move one or more workspaces from the current enterprise account
into a different enterprise account within the same organization.
See `Move workspaces <https://airtable.com/developers/web/api/move-workspaces>`__.
Args:
workspace_ids: The list of workspace IDs.
target: The ID of the target enterprise, or an instance of :class:`~pyairtable.Enterprise`.
"""
if isinstance(target, Enterprise):
target = target.id
response = self.api.post(
self.urls.move_workspaces,
json={
"workspaceIds": workspace_ids,
"targetEnterpriseAccountId": target,
},
)
return MoveWorkspacesResponse.from_api(response, self.api, context=self)
[docs] def create_workspace(self, name: str) -> "Workspace":
"""
Creates a new workspace with the provided name within the enterprise account
and returns the workspace ID. The requesting user must be an active effective
admin of the enterprise account; the created workspace's owner will be the user
who makes the request.
See `Create workspace <https://airtable.com/developers/web/api/create-workspace>`__.
Args:
name: The name of the workspace to be created.
Returns:
The ID of the newly created workspace.
"""
response = self.api.post(
self.urls.create_workspace,
json={
"enterpriseAccountId": self.id,
"name": name,
},
)
return self.api.workspace(str(response["id"]))
[docs] @cache_unless_forced
def packages(
self,
*,
all_enterprises: bool = False,
) -> List[Package]:
"""
List all packages for the enterprise account.
See `List packages <https://airtable.com/developers/web/api/list-enterprise-packages>`__.
Args:
all_enterprises: If True and the enterprise account is the root
enterprise account, returns all packages across the entire
enterprise grid. Defaults to False.
Returns:
A list of Package objects representing the enterprise packages.
"""
params: Dict[str, Any] = {}
if all_enterprises:
params["shouldGetAllPackagesInGrid"] = True
response = self.api.get(self.urls.packages, params=params)
return [
Package.from_api(pkg, self.api, context=self)
for pkg in response.get("packages", [])
]
[docs] def package(self, package_id: str, *, force: bool = False) -> Package:
"""
Retrieve information about a single package by ID.
Args:
package_id: The ID of the package to retrieve.
force: If ``True``, forces a refresh of the cached package list.
Returns:
A Package object representing the enterprise package.
"""
try:
return next(
package
for package in self.packages(force=force)
if package.id == package_id
)
except StopIteration:
raise MissingRecordError(package_id)
[docs] def create_base(
self,
workspace: Union[str, "Workspace"],
name: str,
tables: Sequence[Dict[str, Any]],
) -> "Base":
"""
Create a base in the given workspace.
See https://airtable.com/developers/web/api/create-base
Args:
workspace: The ID of the workspace or a :class:`~pyairtable.Workspace` object.
name: The name to give to the new base. Does not need to be unique.
tables: A list of ``dict`` objects that conform to Airtable's
`Table model <https://airtable.com/developers/web/api/model/table-model>`__.
"""
if isinstance(workspace, str):
workspace = self.api.workspace(workspace)
return workspace.create_base(name, tables)
[docs] def create_base_from_package(
self,
workspace: Union[str, "Workspace"],
name: str,
package_or_release: Union[str, Package],
*,
description: Optional[str] = None,
) -> "Base":
"""
Create a base from an enterprise package template in the specified workspace.
See https://airtable.com/developers/web/api/create-base-from-package-enterprise
Args:
workspace: The ID of the workspace or a :class:`~pyairtable.Workspace` object.
name: The name for the new base.
package_or_release: A :class:`~pyairtable.models.schema.Package` object,
a package ID (``pkg...``), or a package release ID. When a package
or package ID is given, the package's latest release is installed.
Any other string is forwarded to the API as the release ID.
description: Optional description for the base.
Returns:
The newly created Base object.
Raises:
MissingRecordError: If the given package ID is not found.
InvalidParameterError: If the resolved package has no latest release.
"""
workspace_id = workspace if isinstance(workspace, str) else workspace.id
if isinstance(package_or_release, Package):
package_id = package_or_release.id
release_id = package_or_release.latest_release_id
elif package_or_release.startswith("pkg"):
package = self.package(package_or_release)
package_id = package.id
release_id = package.latest_release_id
else:
package_id = release_id = package_or_release
if release_id is None:
raise InvalidParameterError(
f"Package {package_id!r} has no latest release to install"
)
payload = {
"name": name,
"packageReleaseId": release_id,
"workspaceId": workspace_id,
}
if description is not None:
payload["description"] = description
response = self.api.post(self.urls.package_install(package_id), json=payload)
return self.api.base(response["id"], validate=True, force=True)
[docs]class UserRemoved(AirtableModel):
"""
Returned from the `Remove user from enterprise <https://airtable.com/developers/web/api/remove-user-from-enterprise>`__
endpoint.
"""
was_user_removed_as_admin: bool
shared: "UserRemoved.Shared"
unshared: "UserRemoved.Unshared"
[docs] class Shared(AirtableModel):
workspaces: List["UserRemoved.Shared.Workspace"]
[docs] class Workspace(AirtableModel):
permission_level: str
workspace_id: str
workspace_name: str
user_id: str = ""
deleted_time: Optional[datetime] = None
enterprise_account_id: Optional[str] = None
[docs] class Unshared(AirtableModel):
bases: List["UserRemoved.Unshared.Base"]
interfaces: List["UserRemoved.Unshared.Interface"]
workspaces: List["UserRemoved.Unshared.Workspace"]
[docs] class Base(AirtableModel):
user_id: str
base_id: str
base_name: str
former_permission_level: str
deleted_time: Optional[datetime] = None
enterprise_account_id: Optional[str] = None
[docs] class Interface(AirtableModel):
user_id: str
base_id: str
interface_id: str
interface_name: str
former_permission_level: str
deleted_time: Optional[datetime] = None
enterprise_account_id: Optional[str] = None
[docs] class Workspace(AirtableModel):
user_id: str
former_permission_level: str
workspace_id: str
workspace_name: str
deleted_time: Optional[datetime] = None
enterprise_account_id: Optional[str] = None
[docs]class DeleteUsersResponse(AirtableModel):
"""
Returned from the `Delete users by email <https://airtable.com/developers/web/api/delete-users-by-email>`__
endpoint.
"""
deleted_users: List["DeleteUsersResponse.UserInfo"]
errors: List["DeleteUsersResponse.Error"]
[docs] class UserInfo(AirtableModel):
id: str
email: str
[docs] class Error(AirtableModel):
type: str
email: str
message: Optional[str] = None
[docs]class ManageUsersResponse(AirtableModel):
"""
Returned from the `Manage user membership <https://airtable.com/developers/web/api/manage-user-membership>`__,
`Grant admin access <https://airtable.com/developers/web/api/grant-admin-access>`__, and
`Revoke admin access <https://airtable.com/developers/web/api/revoke-admin-access>`__
endpoints.
"""
errors: List["ManageUsersResponse.Error"] = pydantic.Field(default_factory=list)
[docs] class Error(AirtableModel):
id: Optional[str] = None
email: Optional[str] = None
type: str
message: str
[docs]class MoveError(AirtableModel):
id: str
type: str
message: str
[docs]class MoveGroupsResponse(AirtableModel):
"""
Returned by `Move user groups <https://airtable.com/developers/web/api/move-user-groups>`__.
"""
moved_groups: List[NestedId] = pydantic.Field(default_factory=list)
errors: List[MoveError] = pydantic.Field(default_factory=list)
[docs]class MoveWorkspacesResponse(AirtableModel):
"""
Returned by `Move workspaces <https://airtable.com/developers/web/api/move-workspaces>`__.
"""
moved_workspaces: List[NestedId] = pydantic.Field(default_factory=list)
errors: List[MoveError] = pydantic.Field(default_factory=list)
rebuild_models(vars())