Langchain
zenml.integrations.langchain
special
Initialization of the langchain integration.
LangchainIntegration (Integration)
Definition of langchain integration for ZenML.
Source code in zenml/integrations/langchain/__init__.py
class LangchainIntegration(Integration):
"""Definition of langchain integration for ZenML."""
NAME = LANGCHAIN
REQUIREMENTS = ["langchain>=0.0.116"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.langchain import materializers # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/langchain/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.langchain import materializers # noqa
materializers
special
Initialization of the langchain materializer.
document_materializer
Implementation of ZenML's Langchain Document materializer.
LangchainDocumentMaterializer (BaseMaterializer)
Handle Langchain Document objects.
Source code in zenml/integrations/langchain/materializers/document_materializer.py
class LangchainDocumentMaterializer(BaseMaterializer):
"""Handle Langchain Document objects."""
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
ASSOCIATED_TYPES = (Document,)
def __init__(self, **kwargs: Any) -> None:
"""Initializes the Langchain Document materializer.
Args:
**kwargs: Keyword arguments.
"""
super().__init__(**kwargs)
self._pydantic_materializer = PydanticMaterializer(**kwargs)
def load(self, data_type: Type[Document]) -> Any:
"""Reads Langchain Document from JSON.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
return self._pydantic_materializer.load(data_type)
def save(self, data: Document) -> None:
"""Serialize a Langchain Document to JSON.
Args:
data: The data to store.
"""
self._pydantic_materializer.save(data)
def extract_metadata(self, data: Document) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given Langchain Document object.
Args:
data: The Langchain Document object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return self._pydantic_materializer.extract_metadata(data)
__init__(self, **kwargs)
special
Initializes the Langchain Document materializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Keyword arguments. |
{} |
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def __init__(self, **kwargs: Any) -> None:
"""Initializes the Langchain Document materializer.
Args:
**kwargs: Keyword arguments.
"""
super().__init__(**kwargs)
self._pydantic_materializer = PydanticMaterializer(**kwargs)
extract_metadata(self, data)
Extract metadata from the given Langchain Document object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Document |
The Langchain Document object to extract metadata from. |
required |
Returns:
Type | Description |
---|---|
Dict[str, 'MetadataType'] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def extract_metadata(self, data: Document) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given Langchain Document object.
Args:
data: The Langchain Document object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return self._pydantic_materializer.extract_metadata(data)
load(self, data_type)
Reads Langchain Document from JSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Document] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
The data read. |
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def load(self, data_type: Type[Document]) -> Any:
"""Reads Langchain Document from JSON.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
return self._pydantic_materializer.load(data_type)
save(self, data)
Serialize a Langchain Document to JSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Document |
The data to store. |
required |
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def save(self, data: Document) -> None:
"""Serialize a Langchain Document to JSON.
Args:
data: The data to store.
"""
self._pydantic_materializer.save(data)
openai_embedding_materializer
Implementation of the Langchain OpenAI embedding materializer.
LangchainOpenaiEmbeddingMaterializer (BaseMaterializer)
Handle langchain openai embedding objects.
Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
class LangchainOpenaiEmbeddingMaterializer(BaseMaterializer):
"""Handle langchain openai embedding objects."""
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.MODEL
ASSOCIATED_TYPES = (OpenAIEmbeddings,)
def load(self, data_type: Type[OpenAIEmbeddings]) -> OpenAIEmbeddings:
"""Reads an OpenAI embedding from a pickle file.
Args:
data_type: The type of the vector store.
Returns:
The vector store.
"""
super().load(data_type)
# validate langchain package version
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
source_langchain_version = read_file_contents_as_string(
langchain_version_filepath
)
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
if source_langchain_version != package_version:
logger.warn(
f"Your `OpenAIEmbedding` was materialized with {source_langchain_version} "
f"but you are currently using {package_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
# validate python version
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
source_python_version = read_file_contents_as_string(
python_version_filepath
)
current_python_version = Environment().python_version()
if source_python_version != current_python_version:
logger.warn(
f"Your `OpenAIEmbedding` was materialized with {source_python_version} "
f"but you are currently using {current_python_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "rb") as fid:
embedding = pickle.load(fid)
return cast(OpenAIEmbeddings, embedding)
def save(self, embedding: OpenAIEmbeddings) -> None:
"""Save an OpenAI embedding as a pickle file.
Args:
embedding: The embedding to save.
"""
super().save(embedding)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "wb") as fid:
pickle.dump(embedding, fid)
# save python version for validation on loading
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
current_python_version = Environment().python_version()
write_file_contents_as_string(
python_version_filepath, current_python_version
)
# save langchain package version foa
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
write_file_contents_as_string(
langchain_version_filepath, package_version
)
load(self, data_type)
Reads an OpenAI embedding from a pickle file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[langchain.embeddings.openai.OpenAIEmbeddings] |
The type of the vector store. |
required |
Returns:
Type | Description |
---|---|
OpenAIEmbeddings |
The vector store. |
Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
def load(self, data_type: Type[OpenAIEmbeddings]) -> OpenAIEmbeddings:
"""Reads an OpenAI embedding from a pickle file.
Args:
data_type: The type of the vector store.
Returns:
The vector store.
"""
super().load(data_type)
# validate langchain package version
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
source_langchain_version = read_file_contents_as_string(
langchain_version_filepath
)
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
if source_langchain_version != package_version:
logger.warn(
f"Your `OpenAIEmbedding` was materialized with {source_langchain_version} "
f"but you are currently using {package_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
# validate python version
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
source_python_version = read_file_contents_as_string(
python_version_filepath
)
current_python_version = Environment().python_version()
if source_python_version != current_python_version:
logger.warn(
f"Your `OpenAIEmbedding` was materialized with {source_python_version} "
f"but you are currently using {current_python_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "rb") as fid:
embedding = pickle.load(fid)
return cast(OpenAIEmbeddings, embedding)
save(self, embedding)
Save an OpenAI embedding as a pickle file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embedding |
OpenAIEmbeddings |
The embedding to save. |
required |
Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
def save(self, embedding: OpenAIEmbeddings) -> None:
"""Save an OpenAI embedding as a pickle file.
Args:
embedding: The embedding to save.
"""
super().save(embedding)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "wb") as fid:
pickle.dump(embedding, fid)
# save python version for validation on loading
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
current_python_version = Environment().python_version()
write_file_contents_as_string(
python_version_filepath, current_python_version
)
# save langchain package version foa
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
write_file_contents_as_string(
langchain_version_filepath, package_version
)
vector_store_materializer
Implementation of the langchain vector store materializer.
LangchainVectorStoreMaterializer (BaseMaterializer)
Handle langchain vector store objects.
Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
class LangchainVectorStoreMaterializer(BaseMaterializer):
"""Handle langchain vector store objects."""
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
ASSOCIATED_TYPES = (VectorStore,)
def load(self, data_type: Type[VectorStore]) -> VectorStore:
"""Reads a langchain vector store from a pickle file.
Args:
data_type: The type of the vector store.
Returns:
The vector store.
"""
super().load(data_type)
# validate langchain package version
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
source_langchain_version = read_file_contents_as_string(
langchain_version_filepath
)
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
if source_langchain_version != package_version:
logger.warn(
f"Your `VectorStore` was materialized with {source_langchain_version} "
f"but you are currently using {package_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
# validate python version
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
source_python_version = read_file_contents_as_string(
python_version_filepath
)
current_python_version = Environment().python_version()
if source_python_version != current_python_version:
logger.warn(
f"Your `VectorStore` was materialized with {source_python_version} "
f"but you are currently using {current_python_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "rb") as fid:
vector_store = pickle.load(fid)
return cast(VectorStore, vector_store)
def save(self, vector_store: VectorStore) -> None:
"""Save a langchain vector store as a pickle file.
Args:
vector_store: The vector store to save.
"""
super().save(vector_store)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "wb") as fid:
pickle.dump(vector_store, fid)
# save python version for validation on loading
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
current_python_version = Environment().python_version()
write_file_contents_as_string(
python_version_filepath, current_python_version
)
# save langchain package version foa
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
write_file_contents_as_string(
langchain_version_filepath, package_version
)
load(self, data_type)
Reads a langchain vector store from a pickle file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[langchain.vectorstores.base.VectorStore] |
The type of the vector store. |
required |
Returns:
Type | Description |
---|---|
VectorStore |
The vector store. |
Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
def load(self, data_type: Type[VectorStore]) -> VectorStore:
"""Reads a langchain vector store from a pickle file.
Args:
data_type: The type of the vector store.
Returns:
The vector store.
"""
super().load(data_type)
# validate langchain package version
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
source_langchain_version = read_file_contents_as_string(
langchain_version_filepath
)
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
if source_langchain_version != package_version:
logger.warn(
f"Your `VectorStore` was materialized with {source_langchain_version} "
f"but you are currently using {package_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
# validate python version
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
source_python_version = read_file_contents_as_string(
python_version_filepath
)
current_python_version = Environment().python_version()
if source_python_version != current_python_version:
logger.warn(
f"Your `VectorStore` was materialized with {source_python_version} "
f"but you are currently using {current_python_version}. "
f"This might cause unexpected behavior. Attempting to load."
)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "rb") as fid:
vector_store = pickle.load(fid)
return cast(VectorStore, vector_store)
save(self, vector_store)
Save a langchain vector store as a pickle file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
vector_store |
VectorStore |
The vector store to save. |
required |
Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
def save(self, vector_store: VectorStore) -> None:
"""Save a langchain vector store as a pickle file.
Args:
vector_store: The vector store to save.
"""
super().save(vector_store)
pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
with fileio.open(pickle_filepath, "wb") as fid:
pickle.dump(vector_store, fid)
# save python version for validation on loading
python_version_filepath = os.path.join(
self.uri, DEFAULT_PYTHON_VERSION_FILENAME
)
current_python_version = Environment().python_version()
write_file_contents_as_string(
python_version_filepath, current_python_version
)
# save langchain package version foa
try:
package_version = pkg_resources.get_distribution(
LANGCHAIN_PACKAGE_NAME
).version
except pkg_resources.DistributionNotFound:
logger.warn(
f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
)
package_version = "not installed"
langchain_version_filepath = os.path.join(
self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
)
write_file_contents_as_string(
langchain_version_filepath, package_version
)