Table of Contents

Introduction

The core purpose of this syncing pipeline lies in establishing a structured, efficient, and self-updating representation of the raw PDF debates dataset hosted on https://llms.openstate.eu/. We aim to transform this dataset into a format conducive to finetuning language models, where debates are represented as sequences of messages associated with speakers, metadata, and relevant context.

The nature of language models necessitates a consistent structure for training data. PDF files, while containing the debate content, lack inherent organization in terms of speaker identification, debate subject, and the clear delineation of individual messages. This pipeline converts these PDFs into a model-friendly format.

Pipeline Structure

The syncing pipeline follows a carefully designed sequential process. Each step focuses on synchronizing a specific type of resource, building upon the output of previous steps. Let's outline the steps:

  1. download_json:
    • Downloads JSON index files from the remote dataset.
    • Compares these with existing local JSON files to determine which files need to be downloaded, updated, or removed.
  2. insert_doc_index:
    • Inserts the metadata (e.g., title, type, date) contained within the JSON files into the doc_index database table, creating a registry of documents on the remote server.
  3. download_doc:
    • Downloads the raw PDF files based on the information in the doc_index.
    • Intelligently handles updates to existing documents and removes local PDFs deleted on the remote.
  4. symlink_doc
    • Creates symbolic links to downloaded PDFs, sorted into folders based on doctype_slug (e.g., "antwoord-schriftelijke-vragen") and Vergaderjaar (e.g. 2017-2018). This aids in browsing and understanding structural patterns.
  5. insert_doc
    • Inserts metadata about downloaded PDFs (file path, size, etc.) into the document database table.
  6. insert_doc_span
    • Uses the fitz library to extract textual segments ("spans") from the PDFs.
    • Stores these spans in the doc_span table, preparing the text for further parsing into debates.
  7. parse_doc
    • Employs document parsers to analyze the text spans from doc_span.
    • Constructs structured debate objects, consisting of messages, speakers, and metadata.
    • Inserts the parsed debate information into the session and session_entry tables.

Step Breakdown

The heart of the pipeline's efficiency is the incremental load principle. Each step is designed to minimize redundant work. Let's illustrate this with the download_json step:

Registration:

  • Local JSON Registrar: Scans the local filesystem for existing JSON files, creating a registry (think of it as a table) where each row represents a local JSON file and its metadata (name, size).
  • Remote JSON Registrar: Queries the remote server, constructing a similar registry of JSON files available remotely.

An example of these registries could be:

remote.json:

remote.json::fname remote.json::size remote.json::present
1.json 4096 True
2.json 8192 True
3.json 16384 True

local.json:

local.json::fname local.json::size local.json::present
1.json 4096 True
2.json 134 True
4.json 1345 True

Each registry is constructed in such a way that a single row describes a single "entity", in this case a document.

Combining Registries:

  • The local and remote registries are merged, aligning JSON files precisely based on their unique identifiers (eids). This ensures the "one entity, one row" structure is maintained in the combined registry, with all information about the same document consolidated into a single row.
  • The combined view immediately reveals any new, deleted, or modified JSON files. We can quickly determine actions like downloading a missing document or updating local versions that are out of sync with their remote counterparts.
  • In this example, we'd like to merge our registries on the local.json::fname and remote.json::fname columns, each uniquely identify the documents on either side of the resources.

The combined registry would thus look like:

local.json::fname remote.json::fname local.json::size remote.json::size local.json::present remote.json::present
1.json 1.json 4096 4096 True True
2.json 2.json 134 8192 True True
<EMPTY> 3.json <EMPTY> 16384 False True
4.json <EMPTY> 1345 <EMPTY> True False

Through combining the registries of the different resources to be synced, we can easily determine the actions that still need to take place. For example, it's immediately obvious that:

No Action Needed
Files like 1.json remain unchanged since they're present in both locations with matching sizes.
Update Incomplete
2.json requires a re-download due to mismatched file sizes.
Download Missing
3.json exists remotely but is missing locally, so it needs to be downloaded.
Delete Outdated
4.json needs to be removed locally, as it no longer exists on the remote server.

Marking Actions:

  • We use the above combined registry representation to programatically mark the

actions that we still need to perform for each of the documents, such as:

  • "toremove" (if present locally, not remotely)
  • "todownload" (if present remotely, not locally)
  • "torefresh" (if present both places but file sizes differ, i.e. not downloaded completely)

The result of this step is a boolean column for each potential action, added to the combined registry. The column denotes if we need to execute that action for the entity corresponding to that row:

toremove todownload torefresh local.json::fname remote.json::fname local.json::size remote.json::size local.json::present remote.json::present
False False False 1.json 1.json 4096 4096 True True
True True True 2.json 2.json 134 8192 True True
False True False <EMPTY> 3.json <EMPTY> 16384 False True
True False True 4.json <EMPTY> 1345 <EMPTY> True False

Performing actions

Using the combined registry and the marked actions, the actions are performed for the relevant entities, i.e. documents in this case.

Because the steps of registering the state of resources, combining the registries, and marking the actions are computationally very cheap to perform, we can quickly determine the necessary actions to still take, and only perform those.

Configuration vs. Code

The design of our syncing pipeline prioritizes simplicity and efficiency by emphasizing configuration over coding. For example, instead of manually coding complex pandas operations for registry combinations, we use a more straightforward, JSON-representable custom syntax. This approach not only makes the pipeline easier to adjust and maintain but also speeds up the setup process for new data or changes. The same principle applies to the action marking process, which is also configurable, making the system more flexible and adaptable to different types of data or changes in processing requirements.

This design choice significantly reduces the need for repetitive coding, allowing developers to focus on more strategic aspects of the pipeline development. It also ensures that the pipeline can easily be scaled or modified to accommodate new data sources or formats without a significant overhaul. Simplifying the process in this way enhances the overall maintainability of the pipeline and helps in ensuring data integrity throughout the transformation process. This streamlined approach is practical, focusing on what's essential for processing data effectively and efficiently.

Code organization

Each of the syncing steps is taken care of by a single syncer. The full implementation of the download_json syncer is shown below for explanatory purposes. To define a full syncing step, one needs to define (see below for actual implementation):

SYNCNAME
The name of the syncing step (e.g. download_json)
REGISTRARNAMES
The names of the registries to be used in constructing the combined registries. In this case it'd be local.json and remote.json
MERGEINSTRUCTIONS
Using a custom syntax that's portable to a JSON representation to define the precise joins to merge the registries in the desired way.
ACTIONS
The possible actions to be executed in this syncing step. In this case remove and download. The syncer class should define methods with the same name as the action.
ACTIONCONDITIONS
Using a custom syntax that's portable to a JSON representation, we define the conditions to be applied row-wise on the combined registry for each action.
import asyncio
from pathlib import Path

from debot.sync.actions import conditions as co
from debot.sync.decorators import sync_action
from debot.sync.registry_combiners import instructions as ins
from debot.sync.syncers.base import BaseSyncer
from debot.utils import download_files


class DownloadJSONSyncer(BaseSyncer):
    """
    Syncer to synchronize JSON data between from remote to local.
    """

    SYNC_NAME = "download_json"

    REGISTRAR_NAMES = ["local.json", "remote.json"]

    MERGE_INSTRUCTIONS = ins.OuterMerge(
        "local.json",
        "remote.json",
        left_on="local.json::eid",
        right_on="remote.json::eid",
    )

    ACTIONS = ["remove", "download"]

    ACTION_CONDITIONS = {
        "to_refresh": co.And(
            co.And("local.json::present", "remote.json::present"),
            co.NotEqual("local.json::size", "remote.json::size"),
        ),
        "to_download": co.Or(
            co.And(
                co.Not("local.json::present"),
                "remote.json::present",
            ),
            "to_refresh",
        ),
        "to_remove": co.Or(
            co.And(
                "local.json::present",
                co.Not("remote.json::present"),
            ),
            "to_refresh",
        ),
    }

    @sync_action
    def download(self, registry_df):
        """
        Process the downloading action for files specified in the given DataFrame.

        Args:
            registry_df (pandas.DataFrame): A data frame subset for records marked for download.
        """
        download_urls = registry_df["remote.json::url"]
        target_fpaths = (
            self.name2registrar["local.json"].DATA_DIR
            / registry_df["remote.json::fname"]
        )
        url__path_pairs = list(zip(download_urls, target_fpaths))
        successes = asyncio.run(download_files(url__path_pairs))
        if sum(successes) != len(successes):
            print(
                "Failed to download {} files.".format(
                    len(successes) - sum(successes)
                )
            )

    @sync_action
    def remove(self, registry_df):
        """
        Process the removal action for files specified in the given DataFrame.

        Args:
            registry_df (pandas.DataFrame): A data frame subset for records marked for removal.
        """
        for file_path in registry_df["local.json::path"]:
            try:
                Path(file_path).unlink()
                print(f"Removed {file_path}")
            except FileNotFoundError:
                print(f"File {file_path} not found, skipping...")

Conclusion

The design of this syncing pipeline embodies a strategic approach to data processing, emphasizing developer efficiency, reproducibility, and system scalability. By treating the pipeline as a cache for each step of the transformation process—from remote document retrieval to the parsing of debates—it allows developers to build and iterate on the pipeline incrementally. This method minimizes redundant work and facilitates easy rebuilding from specific points of truth when necessary. Additionally, the pipeline's structured adherence and the portability of JSON configurations enhance its maintainability, particularly for large-scale projects. This design philosophy ensures that the pipeline not only efficiently manages data transformations but also adapts seamlessly to changes in source data, maintaining its integrity and utility over time.