from copy import deepcopy
import pandas as pd
import simplejson as json
from tamr_unify_client.attribute.collection import AttributeCollection
from tamr_unify_client.base_resource import BaseResource
from tamr_unify_client.dataset.profile import DatasetProfile
from tamr_unify_client.dataset.status import DatasetStatus
from tamr_unify_client.dataset.uri import DatasetURI
from tamr_unify_client.dataset.usage import DatasetUsage
from tamr_unify_client.operation import Operation
[docs]class Dataset(BaseResource):
"""A Tamr dataset."""
@classmethod
def from_json(cls, client, resource_json, api_path=None):
return super().from_data(client, resource_json, api_path)
@property
def name(self):
""":type: str"""
return self._data.get("name")
@property
def external_id(self):
""":type: str"""
return self._data.get("externalId")
@property
def description(self):
""":type: str"""
return self._data.get("description")
@property
def version(self):
""":type: str"""
return self._data.get("version")
@property
def tags(self):
""":type: list[str]"""
return self._data.get("tags")[:]
@property
def key_attribute_names(self):
""":type: list[str]"""
return self._data.get("keyAttributeNames")[:]
@property
def attributes(self):
"""Attributes of this dataset.
:return: Attributes of this dataset.
:rtype: :class:`~tamr_unify_client.attribute.collection.AttributeCollection`
"""
alias = self.api_path + "/attributes"
return AttributeCollection(self.client, alias)
def _update_records(self, updates, **json_args):
"""Send a batch of record creations/updates/deletions to this dataset.
You probably want to use :func:`~tamr_unify_client.dataset.resource.Dataset.upsert_records`
or :func:`~tamr_unify_client.dataset.resource.Dataset.delete_records` instead.
:param records: Each record should be formatted as specified in the `Public Docs for Dataset updates <https://docs.tamr.com/reference#modify-a-datasets-records>`_.
:type records: iterable[dict]
:param `**json_args`: Arguments to pass to the JSON `dumps` function, as documented `here <https://simplejson.readthedocs.io/en/latest/#simplejson.dumps>`_.
Some of these, such as `indent`, may not work with Tamr.
:returns: JSON response body from server.
:rtype: :py:class:`dict`
"""
stringified_updates = (
json.dumps(update, **json_args).encode("utf-8") for update in updates
)
return (
self.client.post(
self.api_path + ":updateRecords",
headers={"Content-Encoding": "utf-8"},
data=stringified_updates,
)
.successful()
.json()
)
[docs] def upsert_from_dataframe(
self, df: pd.DataFrame, *, primary_key_name: str, ignore_nan: bool = True
) -> dict:
"""Upserts a record for each row of `df` with attributes for each column in `df`.
Args:
df: The data to upsert records from.
primary_key_name: The name of the primary key of the dataset. Must be a column of `df`.
ignore_nan: Whether to convert `NaN` values to `null` before upserting records to Tamr. If `False` and `NaN` is in `df`, this function will fail. Optional, default is `True`.
Returns:
JSON response body from the server.
Raises:
KeyError: If `primary_key_name` is not a column in `df`.
"""
if primary_key_name not in df.columns:
raise KeyError(f"{primary_key_name} is not an attribute of the data")
records = df.to_dict(orient="records")
return self.upsert_records(records, primary_key_name, ignore_nan=ignore_nan)
[docs] def upsert_records(self, records, primary_key_name, **json_args):
"""Creates or updates the specified records.
:param records: The records to update, as dictionaries.
:type records: iterable[dict]
:param primary_key_name: The name of the primary key for these records, which must be a key in each record dictionary.
:type primary_key_name: str
:param `**json_args`: Arguments to pass to the JSON `dumps` function, as documented `here <https://simplejson.readthedocs.io/en/latest/#simplejson.dumps>`_.
Some of these, such as `indent`, may not work with Tamr.
:return: JSON response body from the server.
:rtype: dict
"""
updates = (
{"action": "CREATE", "recordId": record[primary_key_name], "record": record}
for record in records
)
return self._update_records(updates, **json_args)
[docs] def delete_records(self, records, primary_key_name):
"""Deletes the specified records.
:param records: The records to delete, as dictionaries.
:type records: iterable[dict]
:param primary_key_name: The name of the primary key for these records, which must be a key in each record dictionary.
:type primary_key_name: str
:return: JSON response body from the server.
:rtype: dict
"""
ids = (record[primary_key_name] for record in records)
return self.delete_records_by_id(ids)
[docs] def delete_records_by_id(self, record_ids):
"""Deletes the specified records.
:param record_ids: The IDs of the records to delete.
:type record_ids: iterable
:return: JSON response body from the server.
:rtype: dict
"""
updates = ({"action": "DELETE", "recordId": rid} for rid in record_ids)
return self._update_records(updates)
[docs] def delete_all_records(self):
"""Removes all records from the dataset.
:return: HTTP response from the server
:rtype: :class:`requests.Response`
"""
path = self.api_path + "/records"
response = self.client.delete(path).successful()
return response
[docs] def refresh(self, **options):
"""Brings dataset up-to-date if needed, taking whatever actions are required.
:param ``**options``: Options passed to underlying :class:`~tamr_unify_client.operation.Operation` .
See :func:`~tamr_unify_client.operation.Operation.apply_options` .
:returns: The refresh operation.
:rtype: :class:`~tamr_unify_client.operation.Operation`
"""
response = self.client.post(self.api_path + ":refresh").successful()
op = Operation.from_response(self.client, response)
return op.apply_options(**options)
[docs] def profile(self):
"""Returns profile information for a dataset.
If profile information has not been generated, call create_profile() first.
If the returned profile information is out-of-date, you can call refresh() on the returned
object to bring it up-to-date.
:return: Dataset Profile information.
:rtype: :class:`~tamr_unify_client.dataset.profile.DatasetProfile`
"""
profile_json = self.client.get(self.api_path + "/profile").successful().json()
return DatasetProfile.from_json(
self.client, profile_json, api_path=self.api_path + "/profile"
)
[docs] def create_profile(self, **options):
"""Create a profile for this dataset.
If a profile already exists, the existing profile will be brought
up to date.
:param ``**options``: Options passed to underlying :class:`~tamr_unify_client.operation.Operation` .
See :func:`~tamr_unify_client.operation.Operation.apply_options` .
:return: The operation to create the profile.
:rtype: :class:`~tamr_unify_client.operation.Operation`
"""
response = self.client.post(self.api_path + "/profile:refresh").successful()
op = Operation.from_response(self.client, response)
return op.apply_options(**options)
[docs] def records(self):
"""Stream this dataset's records as Python dictionaries.
:return: Stream of records.
:rtype: Python generator yielding :py:class:`dict`
"""
with self.client.get(self.api_path + "/records", stream=True) as response:
for line in response.iter_lines():
yield json.loads(line)
[docs] def status(self):
"""Retrieve this dataset's streamability status.
:return: Dataset streamability status.
:rtype: :class:`~tamr_unify_client.dataset.status.DatasetStatus`
"""
status_json = self.client.get(self.api_path + "/status").successful().json()
return DatasetStatus.from_json(
self.client, status_json, api_path=self.api_path + "/status"
)
[docs] def usage(self):
"""Retrieve this dataset's usage by recipes and downstream datasets.
:return: The dataset's usage.
:rtype: :class:`~tamr_unify_client.dataset.usage.DatasetUsage`
"""
alias = self.api_path + "/usage"
usage = self.client.get(alias).successful().json()
return DatasetUsage.from_json(self.client, usage, alias)
[docs] def from_geo_features(self, features, geo_attr=None):
"""Upsert this dataset from a geospatial FeatureCollection or iterable of Features.
`features` can be:
- An object that implements ``__geo_interface__`` as a FeatureCollection
(see https://gist.github.com/sgillies/2217756)
- An iterable of features, where each element is a feature dictionary or an object
that implements the ``__geo_interface__`` as a Feature
- A map where the "features" key contains an iterable of features
See: geopandas.GeoDataFrame.from_features()
If geo_attr is provided, then the named Tamr attribute will be used for the geometry.
If geo_attr is not provided, then the first attribute on the dataset with geometry type
will be used for the geometry.
:param features: geospatial features
:param geo_attr: (optional) name of the Tamr attribute to use for the feature's geometry
:type geo_attr: str
"""
if hasattr(features, "__geo_interface__"):
features = features.__geo_interface__
if hasattr(features, "get") and features.get("type") == "FeatureCollection":
features = features["features"]
key_attrs = self.key_attribute_names
if len(key_attrs) == 1:
record_id = "recordId"
else:
record_id = "compositeRecordId"
if geo_attr is None:
geo_attr = self._geo_attr
self._update_records(
self._features_to_updates(features, record_id, key_attrs, geo_attr)
)
[docs] def upstream_datasets(self):
"""The Dataset's upstream datasets.
API returns the URIs of the upstream datasets,
resulting in a list of DatasetURIs, not actual Datasets.
:return: A list of the Dataset's upstream datasets.
:rtype: list[:class:`~tamr_unify_client.dataset.uri.DatasetURI`]
"""
alias = self.api_path + "/upstreamDatasets"
resources = self.client.get(alias).successful().json()
return [DatasetURI(self.client, uri) for uri in resources]
[docs] def spec(self):
"""Returns this dataset's spec.
:return: The spec of this dataset.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return DatasetSpec.of(self)
[docs] def delete(self, cascade=False):
"""Deletes this dataset, optionally deleting all derived datasets as well.
:param cascade: Whether to delete all datasets derived from this one. Optional, default is `False`.
Do not use this option unless you are certain you need it as it can have unindended consequences.
:type cascade: bool
:return: HTTP response from the server
:rtype: :class:`requests.Response`
"""
params = {"cascade": cascade}
response = self.client.delete(self.api_path, params=params).successful()
return response
@property
def __geo_interface__(self):
"""Retrieve a representation of this dataset that conforms to the Python Geo Interface.
Note that this materializes all features; for a streaming interface to features,
see :method:`~tamr_unify_client.dataset.Dataset.__geo_features__()`
See https://gist.github.com/sgillies/2217756
:return: dict[str, object]
"""
return {
"type": "FeatureCollection",
"features": [feature for feature in self.itergeofeatures()],
}
[docs] def itergeofeatures(self, geo_attr=None):
"""Returns an iterator that yields feature dictionaries that comply with __geo_interface__
See https://gist.github.com/sgillies/2217756
:param geo_attr: (optional) name of the Tamr attribute to use for the feature's geometry
:type geo_attr: str
:return: stream of features
:rtype: Python generator yielding :py:class:`dict[str, object]`
"""
key_attrs = self.key_attribute_names
if len(key_attrs) == 1:
def key_value(rec):
return rec[key_attrs[0]]
else:
def key_value(rec):
return [rec[attr] for attr in key_attrs]
if geo_attr is None:
geo_attr = self._geo_attr
for record in self.records():
yield self._record_to_feature(record, key_value, key_attrs, geo_attr)
@property
def _geo_attr(self):
"""The name of the attribute that contains geometry
:return: the name of the attribute that contains geometry
:rtype: str
"""
# Duck-typing: find all the attributes that look like geometry
geo_attrs = [
attr.name
for attr in self.attributes
if "RECORD" == attr.type.base_type
and self._geo_attr_names().intersection(
{sub_attr.name for sub_attr in attr.type.attributes}
)
]
# We select the first such attribute as the geometry
if geo_attrs:
geo_attr = geo_attrs[0]
else:
geo_attr = None
return geo_attr
@staticmethod
def _record_to_feature(record, key_value, key_attrs, geo_attr):
"""Convert a Tamr record to a Python Geo Interface Feature
:param record: Tamr record
:param key_value: Function to extract the value of the primary key from the record
:param key_attrs: Set of attributes that comprise the primary key for the record
:param geo_attr: The singular attribute to use as the geometry
:return: map from str to object
"""
feature = {"type": "Feature", "id": key_value(record)}
reserved = {"bbox", geo_attr}.union(key_attrs)
if geo_attr and geo_attr in record:
src_geo = record[geo_attr]
if src_geo:
for unify_attr in Dataset._geo_attr_names():
if unify_attr in src_geo and src_geo[unify_attr]:
feature["geometry"] = {
# Convert e.g. multiLineString -> MultiLineString
"type": unify_attr[0].upper() + unify_attr[1:],
"coordinates": src_geo[unify_attr],
}
break
else:
feature["geometry"] = None
else:
feature["geometry"] = None
if "bbox" in record:
feature["bbox"] = record["bbox"]
non_reserved = set(record.keys()).difference(reserved)
if non_reserved:
feature["properties"] = {attr: record[attr] for attr in non_reserved}
return feature
@staticmethod
def _feature_to_record(feature, key_attrs, geo_attr):
"""Convert a Python Geo Interface Feature to a Tamr record
feature can be a dict representing a Geospatial Feature, or a Feature object
that implements the __geo_interface__ property.
:param feature: Python Geo Interface Feature
:param key_attrs: Sequence of attributes that comprise the primary key for the record
:param geo_attr: The singluar attribute on the record to use for the geometry
:return: dict
"""
if hasattr(feature, "__geo_interface__"):
feature = feature.__geo_interface__
record = {}
props = feature.get("properties")
if props:
for prop in props:
record[prop] = props[prop]
geometry = feature.get("geometry")
if geometry:
geo_type = geometry["type"]
# Convert e.g. "MultiLineString" -> "multiLineString"
geo_type = geo_type[0].lower() + geo_type[1:]
record[geo_attr] = {geo_type: geometry["coordinates"]}
bbox = feature.get("bbox")
if bbox:
record["bbox"] = bbox
if "id" not in feature or feature["id"] is None:
raise ValueError("id must have a non-null value")
if key_attrs[1:]:
key_values = feature["id"]
for i, attr in enumerate(key_attrs):
record[attr] = key_values[i]
else:
record[key_attrs[0]] = feature["id"]
return record
@staticmethod
def _features_to_updates(features, id_attr, key_attrs, geo_attr):
for feature in features:
yield {
"action": "CREATE",
id_attr: feature["id"],
"record": Dataset._feature_to_record(feature, key_attrs, geo_attr),
}
def __repr__(self):
return (
f"{self.__class__.__module__}."
f"{self.__class__.__qualname__}("
f"relative_id={self.relative_id!r}, "
f"name={self.name!r}, "
f"version={self.version!r})"
)
@staticmethod
def _geo_attr_names():
return {
"point",
"multiPoint",
"lineString",
"multiLineString",
"polygon",
"multiPolygon",
}
[docs]class DatasetSpec:
"""A representation of the server view of a dataset."""
def __init__(self, client, data, api_path):
self.client = client
self._data = data
self.api_path = api_path
[docs] @staticmethod
def of(resource):
"""Creates a dataset spec from a dataset.
:param resource: The existing dataset.
:type resource: :class:`~tamr_unify_client.dataset.resource.Dataset`
:return: The corresponding dataset spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return DatasetSpec(resource.client, deepcopy(resource._data), resource.api_path)
[docs] @staticmethod
def new():
"""Creates a blank spec that could be used to construct a new dataset.
:return: The empty spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return DatasetSpec(None, {}, None)
[docs] def from_data(self, data):
"""Creates a spec with the same client and API path as this one, but new data.
:param data: The data for the new spec.
:type data: dict
:return: The new spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return DatasetSpec(self.client, data, self.api_path)
[docs] def to_dict(self):
"""Returns a version of this spec that conforms to the API representation.
:returns: The spec's dict.
:rtype: dict
"""
return deepcopy(self._data)
[docs] def with_name(self, new_name):
"""Creates a new spec with the same properties, updating name.
:param new_name: The new name.
:type new_name: str
:return: A new spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return self.from_data({**self._data, "name": new_name})
[docs] def with_external_id(self, new_external_id):
"""Creates a new spec with the same properties, updating external ID.
:param new_external_id: The new external ID.
:type new_external_id: str
:return: A new spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return self.from_data({**self._data, "externalId": new_external_id})
[docs] def with_description(self, new_description):
"""Creates a new spec with the same properties, updating description.
:param new_description: The new description.
:type new_description: str
:return: A new spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return self.from_data({**self._data, "description": new_description})
[docs] def with_key_attribute_names(self, new_key_attribute_names):
"""Creates a new spec with the same properties, updating key attribute names.
:param new_key_attribute_names: The new key attribute names.
:type new_key_attribute_names: list[str]
:return: A new spec.
:rtype: :class:`~tamr_unify_client.dataset.resource.DatasetSpec`
"""
return self.from_data(
{**self._data, "keyAttributeNames": new_key_attribute_names}
)
[docs] def put(self):
"""Updates the dataset on the server.
:return: The modified dataset.
:rtype: :class:`~tamr_unify_client.dataset.resource.Dataset`
"""
new_data = self.client.put(self.api_path, json=self._data).successful().json()
return Dataset.from_json(self.client, new_data, self.api_path)
def __repr__(self):
return (
f"{self.__class__.__module__}."
f"{self.__class__.__qualname__}("
f"dict={self._data})"
)