How to create a custom Document Loader
Overviewโ
Applications based on LLMs frequently entail extracting data from databases or files, like PDFs, and converting it into a format that LLMs can utilize. In LangChain, this usually involves creating Document objects, which encapsulate the extracted text (page_content) along with metadataโa dictionary containing details about the document, such as the author's name or the date of publication.
Document objects are often formatted into prompts that are fed into an LLM, allowing the LLM to use the information in the Document to generate a desired response (e.g., summarizing the document).
Documents can be either used immediately or indexed into a vectorstore for future retrieval and use.
The main abstractions for Document Loading are:
| Component | Description | 
|---|---|
| Document | Contains textandmetadata | 
| BaseLoader | Use to convert raw data into Documents | 
| Blob | A representation of binary data that's located either in a file or in memory | 
| BaseBlobParser | Logic to parse a Blobto yieldDocumentobjects | 
This guide will demonstrate how to write custom document loading and file parsing logic; specifically, we'll see how to:
- Create a standard document Loader by sub-classing from BaseLoader.
- Create a parser using  BaseBlobParserand use it in conjunction withBlobandBlobLoaders. This is useful primarily when working with files.
Standard Document Loaderโ
A document loader can be implemented by sub-classing from a BaseLoader which provides a standard interface for loading documents.
Interfaceโ
| Method Name | Explanation | 
|---|---|
| lazy_load | Used to load documents one by one lazily. Use for production code. | 
| alazy_load | Async variant of lazy_load | 
| load | Used to load all the documents into memory eagerly. Use for prototyping or interactive work. | 
| aload | Used to load all the documents into memory eagerly. Use for prototyping or interactive work. Added in 2024-04 to LangChain. | 
- The loadmethods is a convenience method meant solely for prototyping work -- it just invokeslist(self.lazy_load()).
- The alazy_loadhas a default implementation that will delegate tolazy_load. If you're using async, we recommend overriding the default implementation and providing a native async implementation.
When implementing a document loader do NOT provide parameters via the lazy_load or alazy_load methods.
All configuration is expected to be passed through the initializer (init). This was a design choice made by LangChain to make sure that once a document loader has been instantiated it has all the information needed to load documents.
Installationโ
Install langchain-core and langchain_community.
%pip install -qqU langchain_core langchain_community
Implementationโ
Let's create an example of a standard document loader that loads a file and creates a document from each line in the file.
from typing import AsyncIterator, Iterator
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
class CustomDocumentLoader(BaseLoader):
    """An example document loader that reads a file line by line."""
    def __init__(self, file_path: str) -> None:
        """Initialize the loader with a file path.
        Args:
            file_path: The path to the file to load.
        """
        self.file_path = file_path
    def lazy_load(self) -> Iterator[Document]:  # <-- Does not take any arguments
        """A lazy loader that reads a file line by line.
        When you're implementing lazy load methods, you should use a generator
        to yield documents one by one.
        """
        with open(self.file_path, encoding="utf-8") as f:
            line_number = 0
            for line in f:
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": self.file_path},
                )
                line_number += 1
    # alazy_load is OPTIONAL.
    # If you leave out the implementation, a default implementation which delegates to lazy_load will be used!
    async def alazy_load(
        self,
    ) -> AsyncIterator[Document]:  # <-- Does not take any arguments
        """An async lazy loader that reads a file line by line."""
        # Requires aiofiles
        # Install with `pip install aiofiles`
        # https://github.com/Tinche/aiofiles
        import aiofiles
        async with aiofiles.open(self.file_path, encoding="utf-8") as f:
            line_number = 0
            async for line in f:
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": self.file_path},
                )
                line_number += 1
Test ๐งชโ
To test out the document loader, we need a file with some quality content.
with open("./meow.txt", "w", encoding="utf-8") as f:
    quality_content = "meow meow๐ฑ \n meow meow๐ฑ \n meow๐ป๐ป"
    f.write(quality_content)
loader = CustomDocumentLoader("./meow.txt")
%pip install -q aiofiles
## Test out the lazy load interface
for doc in loader.lazy_load():
    print()
    print(type(doc))
    print(doc)
<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐ฑ 
' metadata={'line_number': 0, 'source': './meow.txt'}
<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐ฑ 
' metadata={'line_number': 1, 'source': './meow.txt'}
<class 'langchain_core.documents.base.Document'>
page_content=' meow๐ป๐ป' metadata={'line_number': 2, 'source': './meow.txt'}
## Test out the async implementation
async for doc in loader.alazy_load():
    print()
    print(type(doc))
    print(doc)
<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐ฑ 
' metadata={'line_number': 0, 'source': './meow.txt'}
<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐ฑ 
' metadata={'line_number': 1, 'source': './meow.txt'}
<class 'langchain_core.documents.base.Document'>
page_content=' meow๐ป๐ป' metadata={'line_number': 2, 'source': './meow.txt'}
load() can be helpful in an interactive environment such as a jupyter notebook.
Avoid using it for production code since eager loading assumes that all the content can fit into memory, which is not always the case, especially for enterprise data.
loader.load()
[Document(metadata={'line_number': 0, 'source': './meow.txt'}, page_content='meow meow๐ฑ \n'),
 Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content=' meow meow๐ฑ \n'),
 Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow๐ป๐ป')]
Working with Filesโ
Many document loaders involve parsing files. The difference between such loaders usually stems from how the file is parsed, rather than how the file is loaded. For example, you can use open to read the binary content of either a PDF or a markdown file, but you need different parsing logic to convert that binary data into text.
As a result, it can be helpful to decouple the parsing logic from the loading logic, which makes it easier to re-use a given parser regardless of how the data was loaded.
BaseBlobParserโ
A BaseBlobParser is an interface that accepts a blob and outputs a list of Document objects. A blob is a representation of data that lives either in memory or in a file. LangChain python has a Blob primitive which is inspired by the Blob WebAPI spec.
from langchain_core.document_loaders import BaseBlobParser, Blob
class MyParser(BaseBlobParser):
    """A simple parser that creates a document from each line."""
    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """Parse a blob into a document line by line."""
        line_number = 0
        with blob.as_bytes_io() as f:
            for line in f:
                line_number += 1
                yield Document(
                    page_content=line,
                    metadata={"line_number": line_number, "source": blob.source},
                )
blob = Blob.from_path("./meow.txt")
parser = MyParser()
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content='meow meow๐ฑ \n'),
 Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow meow๐ฑ \n'),
 Document(metadata={'line_number': 3, 'source': './meow.txt'}, page_content=' meow๐ป๐ป')]
Using the blob API also allows one to load content directly from memory without having to read it from a file!
blob = Blob(data=b"some data from memory\nmeow")
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': None}, page_content='some data from memory\n'),
 Document(metadata={'line_number': 2, 'source': None}, page_content='meow')]
Blobโ
Let's take a quick look through some of the Blob API.
blob = Blob.from_path("./meow.txt", metadata={"foo": "bar"})
blob.encoding
'utf-8'
blob.as_bytes()
b'meow meow\xf0\x9f\x90\xb1 \n meow meow\xf0\x9f\x90\xb1 \n meow\xf0\x9f\x98\xbb\xf0\x9f\x98\xbb'
blob.as_string()
'meow meow๐ฑ \n meow meow๐ฑ \n meow๐ป๐ป'
blob.as_bytes_io()
<contextlib._GeneratorContextManager at 0x7f89cf9336d0>
blob.metadata
{'foo': 'bar'}
blob.source
'./meow.txt'
Blob Loadersโ
While a parser encapsulates the logic needed to parse binary data into documents, blob loaders encapsulate the logic that's necessary to load blobs from a given storage location.
A the moment, LangChain supports FileSystemBlobLoader and CloudBlobLoader.
You can use the FileSystemBlobLoader to load blobs and then use the parser to parse them.
from langchain_community.document_loaders.blob_loaders import FileSystemBlobLoader
filesystem_blob_loader = FileSystemBlobLoader(
    path=".", glob="*.mdx", show_progress=True
)
%pip install -q tqdm
parser = MyParser()
for blob in filesystem_blob_loader.yield_blobs():
    for doc in parser.lazy_parse(blob):
        print(doc)
        break
0it [00:00, ?it/s]
Or, you can use CloudBlobLoader to load blobs from a cloud storage location (Supports s3://, az://, gs://, file:// schemes).
%pip install -q cloudpathlib[s3]
from langchain_community.document_loaders.blob_loaders import CloudBlobLoader
cloud_blob_loader = CloudBlobLoader(
    url="https://ee-files.s3.amazonaws.com/files/106410/download/821953/7-http-s3.amazonaws.comrosco.pdf",
    glob="*.pdf",
    show_progress=True,
)
for blob in cloud_blob_loader.yield_blobs():
    print(blob)
---------------------------------------------------------------------------
``````output
NoCredentialsError                        Traceback (most recent call last)
``````output
Cell In[21], line 4
      1 from langchain_community.document_loaders.blob_loaders import CloudBlobLoader
      3 blob_loader = CloudBlobLoader(url="s3://my_bucket", glob="*.mdx", show_progress=True)
----> 4 for blob in blob_loader.yield_blobs():
      5     print(blob)
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:217, in CloudBlobLoader.yield_blobs(self)
    212 """Yield blobs that match the requested pattern."""
    213 iterator = _make_iterator(
    214     length_func=self.count_matching_files, show_progress=self.show_progress
    215 )
--> 217 for path in iterator(self._yield_paths()):
    218     # yield Blob.from_path(path)
    219     yield self.from_path(path)
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:115, in _make_iterator.<locals>._with_tqdm(iterable)
    113 def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]:
    114     """Wrap an iterable in a tqdm progress bar."""
--> 115     return tqdm(iterable, total=length_func())
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:242, in CloudBlobLoader.count_matching_files(self)
    239 # Carry out a full iteration to count the files without
    240 # materializing anything expensive in memory.
    241 num = 0
--> 242 for _ in self._yield_paths():
    243     num += 1
    244 return num
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:228, in CloudBlobLoader._yield_paths(self)
    225     return
    227 paths = self.path.glob(self.glob)
--> 228 for path in paths:
    229     if self.exclude:
    230         if any(path.match(glob) for glob in self.exclude):
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:500, in CloudPath.glob(self, pattern, case_sensitive)
    495 pattern_parts = PurePosixPath(pattern).parts
    496 selector = _make_selector(
    497     tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
    498 )
--> 500 yield from self._glob(
    501     selector,
    502     "/" in pattern
    503     or "**"
    504     in pattern,  # recursive listing needed if explicit ** or any sub folder in pattern
    505 )
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:478, in CloudPath._glob(self, selector, recursive)
    477 def _glob(self, selector, recursive: bool) -> Generator[Self, None, None]:
--> 478     file_tree = self._build_subtree(recursive)
    480     root = _CloudPathSelectable(
    481         self.name,
    482         [],  # nothing above self will be returned, so initial parents is empty
    483         file_tree,
    484     )
    486     for p in selector.select_from(root):
    487         # select_from returns self.name/... so strip before joining
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:465, in CloudPath._build_subtree(self, recursive)
    461         _build_tree(trunk[branch], next_branch, nodes, is_dir)
    463 file_tree = Tree()
--> 465 for f, is_dir in self.client._list_dir(self, recursive=recursive):
    466     parts = str(f.relative_to(self)).split("/")
    468     # skip self
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/s3/s3client.py:233, in S3Client._list_dir(self, cloud_path, recursive)
    229 yielded_dirs = set()
    231 paginator = self.client.get_paginator("list_objects_v2")
--> 233 for result in paginator.paginate(
    234     Bucket=cloud_path.bucket,
    235     Prefix=prefix,
    236     Delimiter=("" if recursive else "/"),
    237     **self.boto3_list_extra_args,
    238 ):
    239     # yield everything in common prefixes as directories
    240     for result_prefix in result.get("CommonPrefixes", []):
    241         canonical = result_prefix.get("Prefix").rstrip("/")  # keep a canonical form
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/paginate.py:269, in PageIterator.__iter__(self)
    267 self._inject_starting_params(current_kwargs)
    268 while True:
--> 269     response = self._make_request(current_kwargs)
    270     parsed = self._extract_parsed_response(response)
    271     if first_request:
    272         # The first request is handled differently.  We could
    273         # possibly have a resume/starting token that tells us where
    274         # to index into the retrieved page.
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/paginate.py:357, in PageIterator._make_request(self, current_kwargs)
    356 def _make_request(self, current_kwargs):
--> 357     return self._method(**current_kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:569, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
    565     raise TypeError(
    566         f"{py_operation_name}() only accepts keyword arguments."
    567     )
    568 # The "self" in this scope is referring to the BaseClient.
--> 569 return self._make_api_call(operation_name, kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:1005, in BaseClient._make_api_call(self, operation_name, api_params)
   1001     maybe_compress_request(
   1002         self.meta.config, request_dict, operation_model
   1003     )
   1004     apply_request_checksum(request_dict)
-> 1005     http, parsed_response = self._make_request(
   1006         operation_model, request_dict, request_context
   1007     )
   1009 self.meta.events.emit(
   1010     f'after-call.{service_id}.{operation_name}',
   1011     http_response=http,
   (...)
   1014     context=request_context,
   1015 )
   1017 if http.status_code >= 300:
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:1029, in BaseClient._make_request(self, operation_model, request_dict, request_context)
   1027 def _make_request(self, operation_model, request_dict, request_context):
   1028     try:
-> 1029         return self._endpoint.make_request(operation_model, request_dict)
   1030     except Exception as e:
   1031         self.meta.events.emit(
   1032             f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}',
   1033             exception=e,
   1034             context=request_context,
   1035         )
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:119, in Endpoint.make_request(self, operation_model, request_dict)
    113 def make_request(self, operation_model, request_dict):
    114     logger.debug(
    115         "Making request for %s with params: %s",
    116         operation_model,
    117         request_dict,
    118     )
--> 119     return self._send_request(request_dict, operation_model)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:196, in Endpoint._send_request(self, request_dict, operation_model)
    194 context = request_dict['context']
    195 self._update_retries_context(context, attempts)
--> 196 request = self.create_request(request_dict, operation_model)
    197 success_response, exception = self._get_response(
    198     request, operation_model, context
    199 )
    200 while self._needs_retry(
    201     attempts,
    202     operation_model,
   (...)
    205     exception,
    206 ):
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:132, in Endpoint.create_request(self, params, operation_model)
    130     service_id = operation_model.service_model.service_id.hyphenize()
    131     event_name = f'request-created.{service_id}.{operation_model.name}'
--> 132     self._event_emitter.emit(
    133         event_name,
    134         request=request,
    135         operation_name=operation_model.name,
    136     )
    137 prepared_request = self.prepare_request(request)
    138 return prepared_request
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:412, in EventAliaser.emit(self, event_name, **kwargs)
    410 def emit(self, event_name, **kwargs):
    411     aliased_event_name = self._alias_event_name(event_name)
--> 412     return self._emitter.emit(aliased_event_name, **kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:256, in HierarchicalEmitter.emit(self, event_name, **kwargs)
    245 def emit(self, event_name, **kwargs):
    246     """
    247     Emit an event by name with arguments passed as keyword args.
    248 
   (...)
    254              handlers.
    255     """
--> 256     return self._emit(event_name, kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:239, in HierarchicalEmitter._emit(self, event_name, kwargs, stop_on_response)
    237 for handler in handlers_to_call:
    238     logger.debug('Event %s: calling handler %s', event_name, handler)
--> 239     response = handler(**kwargs)
    240     responses.append((handler, response))
    241     if stop_on_response and response is not None:
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/signers.py:105, in RequestSigner.handler(self, operation_name, request, **kwargs)
    100 def handler(self, operation_name=None, request=None, **kwargs):
    101     # This is typically hooked up to the "request-created" event
    102     # from a client's event emitter.  When a new request is created
    103     # this method is invoked to sign the request.
    104     # Don't call this method directly.
--> 105     return self.sign(operation_name, request)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/signers.py:197, in RequestSigner.sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
    194     else:
    195         raise e
--> 197 auth.add_auth(request)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/auth.py:423, in SigV4Auth.add_auth(self, request)
    421 def add_auth(self, request):
    422     if self.credentials is None:
--> 423         raise NoCredentialsError()
    424     datetime_now = datetime.datetime.utcnow()
    425     request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
``````output
NoCredentialsError: Unable to locate credentials
Generic Loaderโ
LangChain has a GenericLoader abstraction which composes a BlobLoader with a BaseBlobParser.
GenericLoader is meant to provide standardized classmethods that make it easy to use existing BlobLoader implementations. At the moment, the FileSystemBlobLoader and CloudBlobLoader are supported.
from langchain_community.document_loaders.generic import GenericLoader
generic_loader_filesystem = GenericLoader(
    blob_loader=filesystem_blob_loader, blob_parser=parser
)
for idx, doc in enumerate(loader.lazy_load()):
    if idx < 5:
        print(doc)
print("... output truncated for demo purposes")
page_content='meow meow๐ฑ 
' metadata={'line_number': 0, 'source': './meow.txt'}
page_content=' meow meow๐ฑ 
' metadata={'line_number': 1, 'source': './meow.txt'}
page_content=' meow๐ป๐ป' metadata={'line_number': 2, 'source': './meow.txt'}
... output truncated for demo purposes
from langchain_community.document_loaders.generic import GenericLoader
generic_loader_cloud = GenericLoader(blob_loader=cloud_blob_loader, blob_parser=parser)
for idx, doc in enumerate(loader.lazy_load()):
    if idx < 5:
        print(doc)
print("... output truncated for demo purposes")
Custom Generic Loaderโ
If you really like creating classes, you can sub-class and create a class to encapsulate the logic together.
You can sub-class from this class to load content using an existing loader.
from typing import Any
class MyCustomLoader(GenericLoader):
    @staticmethod
    def get_parser(**kwargs: Any) -> BaseBlobParser:
        """Override this method to associate a default parser with the class."""
        return MyParser()
loader = MyCustomLoader.from_filesystem(path=".", glob="*.mdx", show_progress=True)
for idx, doc in enumerate(loader.lazy_load()):
    if idx < 5:
        print(doc)
print("... output truncated for demo purposes")
0it [00:00, ?it/s]
... output truncated for demo purposes