Table of Contents
Introduction
The core purpose of this syncing pipeline lies in establishing a structured, efficient, and self-updating representation of the raw PDF debates dataset hosted on https://llms.openstate.eu/. We aim to transform this dataset into a format conducive to finetuning language models, where debates are represented as sequences of messages associated with speakers, metadata, and relevant context.
The nature of language models necessitates a consistent structure for training data. PDF files, while containing the debate content, lack inherent organization in terms of speaker identification, debate subject, and the clear delineation of individual messages. This pipeline converts these PDFs into a model-friendly format.
Pipeline Structure
The syncing pipeline follows a carefully designed sequential process. Each step focuses on synchronizing a specific type of resource, building upon the output of previous steps. Let's outline the steps:
download_json
:- Downloads JSON index files from the remote dataset.
- Compares these with existing local JSON files to determine which files need to be downloaded, updated, or removed.
insert_doc_index
:- Inserts the metadata (e.g., title, type, date) contained within the JSON
files into the
doc_index
database table, creating a registry of documents on the remote server.
- Inserts the metadata (e.g., title, type, date) contained within the JSON
files into the
download_doc
:- Downloads the raw PDF files based on the information in the
doc_index
. - Intelligently handles updates to existing documents and removes local PDFs deleted on the remote.
- Downloads the raw PDF files based on the information in the
symlink_doc
- Creates symbolic links to downloaded PDFs, sorted into folders based on
doctype_slug
(e.g., "antwoord-schriftelijke-vragen") andVergaderjaar
(e.g.2017-2018
). This aids in browsing and understanding structural patterns.
- Creates symbolic links to downloaded PDFs, sorted into folders based on
insert_doc
- Inserts metadata about downloaded PDFs (file path, size, etc.) into the
document
database table.
- Inserts metadata about downloaded PDFs (file path, size, etc.) into the
insert_doc_span
- Uses the
fitz
library to extract textual segments ("spans") from the PDFs. - Stores these spans in the
doc_span
table, preparing the text for further parsing into debates.
- Uses the
parse_doc
- Employs document parsers to analyze the text spans from
doc_span
. - Constructs structured debate objects, consisting of messages, speakers, and metadata.
- Inserts the parsed debate information into the
session
andsession_entry
tables.
- Employs document parsers to analyze the text spans from
Step Breakdown
The heart of the pipeline's efficiency is the incremental load principle. Each
step is designed to minimize redundant work. Let's illustrate this with the
download_json
step:
Registration:
- Local JSON Registrar: Scans the local filesystem for existing JSON files, creating a registry (think of it as a table) where each row represents a local JSON file and its metadata (name, size).
- Remote JSON Registrar: Queries the remote server, constructing a similar registry of JSON files available remotely.
An example of these registries could be:
remote.json
:
remote.json::fname | remote.json::size | remote.json::present |
---|---|---|
1.json | 4096 | True |
2.json | 8192 | True |
3.json | 16384 | True |
local.json
:
local.json::fname | local.json::size | local.json::present |
---|---|---|
1.json | 4096 | True |
2.json | 134 | True |
4.json | 1345 | True |
Each registry is constructed in such a way that a single row describes a single "entity", in this case a document.
Combining Registries:
- The local and remote registries are merged, aligning JSON files precisely based on their unique identifiers (eids). This ensures the "one entity, one row" structure is maintained in the combined registry, with all information about the same document consolidated into a single row.
- The combined view immediately reveals any new, deleted, or modified JSON files. We can quickly determine actions like downloading a missing document or updating local versions that are out of sync with their remote counterparts.
- In this example, we'd like to merge our registries on the
local.json::fname
andremote.json::fname
columns, each uniquely identify the documents on either side of the resources.
The combined registry would thus look like:
local.json::fname | remote.json::fname | local.json::size | remote.json::size | local.json::present | remote.json::present |
---|---|---|---|---|---|
1.json | 1.json | 4096 | 4096 | True | True |
2.json | 2.json | 134 | 8192 | True | True |
<EMPTY> | 3.json | <EMPTY> | 16384 | False | True |
4.json | <EMPTY> | 1345 | <EMPTY> | True | False |
Through combining the registries of the different resources to be synced, we can easily determine the actions that still need to take place. For example, it's immediately obvious that:
- No Action Needed
- Files like 1.json remain unchanged since they're present in both locations with matching sizes.
- Update Incomplete
- 2.json requires a re-download due to mismatched file sizes.
- Download Missing
- 3.json exists remotely but is missing locally, so it needs to be downloaded.
- Delete Outdated
- 4.json needs to be removed locally, as it no longer exists on the remote server.
Marking Actions:
- We use the above combined registry representation to programatically mark the
actions that we still need to perform for each of the documents, such as:
- "toremove" (if present locally, not remotely)
- "todownload" (if present remotely, not locally)
- "torefresh" (if present both places but file sizes differ, i.e. not downloaded completely)
The result of this step is a boolean column for each potential action, added to the combined registry. The column denotes if we need to execute that action for the entity corresponding to that row:
toremove | todownload | torefresh | local.json::fname | remote.json::fname | local.json::size | remote.json::size | local.json::present | remote.json::present |
---|---|---|---|---|---|---|---|---|
False | False | False | 1.json | 1.json | 4096 | 4096 | True | True |
True | True | True | 2.json | 2.json | 134 | 8192 | True | True |
False | True | False | <EMPTY> | 3.json | <EMPTY> | 16384 | False | True |
True | False | True | 4.json | <EMPTY> | 1345 | <EMPTY> | True | False |
Performing actions
Using the combined registry and the marked actions, the actions are performed for the relevant entities, i.e. documents in this case.
Because the steps of registering the state of resources, combining the registries, and marking the actions are computationally very cheap to perform, we can quickly determine the necessary actions to still take, and only perform those.
Configuration vs. Code
The design of our syncing pipeline prioritizes simplicity and efficiency by emphasizing configuration over coding. For example, instead of manually coding complex pandas operations for registry combinations, we use a more straightforward, JSON-representable custom syntax. This approach not only makes the pipeline easier to adjust and maintain but also speeds up the setup process for new data or changes. The same principle applies to the action marking process, which is also configurable, making the system more flexible and adaptable to different types of data or changes in processing requirements.
This design choice significantly reduces the need for repetitive coding, allowing developers to focus on more strategic aspects of the pipeline development. It also ensures that the pipeline can easily be scaled or modified to accommodate new data sources or formats without a significant overhaul. Simplifying the process in this way enhances the overall maintainability of the pipeline and helps in ensuring data integrity throughout the transformation process. This streamlined approach is practical, focusing on what's essential for processing data effectively and efficiently.
Code organization
Each of the syncing steps is taken care of by a single syncer. The full
implementation of the download_json
syncer is shown below for explanatory
purposes. To define a full syncing step, one needs to define (see below for
actual implementation):
- SYNCNAME
- The name of the syncing step (e.g.
download_json
) - REGISTRARNAMES
- The names of the registries to be used in constructing the
combined registries. In this case it'd be
local.json
andremote.json
- MERGEINSTRUCTIONS
- Using a custom syntax that's portable to a JSON representation to define the precise joins to merge the registries in the desired way.
- ACTIONS
- The possible actions to be executed in this syncing step. In this
case
remove
anddownload
. The syncer class should define methods with the same name as the action. - ACTIONCONDITIONS
- Using a custom syntax that's portable to a JSON representation, we define the conditions to be applied row-wise on the combined registry for each action.
import asyncio from pathlib import Path from debot.sync.actions import conditions as co from debot.sync.decorators import sync_action from debot.sync.registry_combiners import instructions as ins from debot.sync.syncers.base import BaseSyncer from debot.utils import download_files class DownloadJSONSyncer(BaseSyncer): """ Syncer to synchronize JSON data between from remote to local. """ SYNC_NAME = "download_json" REGISTRAR_NAMES = ["local.json", "remote.json"] MERGE_INSTRUCTIONS = ins.OuterMerge( "local.json", "remote.json", left_on="local.json::eid", right_on="remote.json::eid", ) ACTIONS = ["remove", "download"] ACTION_CONDITIONS = { "to_refresh": co.And( co.And("local.json::present", "remote.json::present"), co.NotEqual("local.json::size", "remote.json::size"), ), "to_download": co.Or( co.And( co.Not("local.json::present"), "remote.json::present", ), "to_refresh", ), "to_remove": co.Or( co.And( "local.json::present", co.Not("remote.json::present"), ), "to_refresh", ), } @sync_action def download(self, registry_df): """ Process the downloading action for files specified in the given DataFrame. Args: registry_df (pandas.DataFrame): A data frame subset for records marked for download. """ download_urls = registry_df["remote.json::url"] target_fpaths = ( self.name2registrar["local.json"].DATA_DIR / registry_df["remote.json::fname"] ) url__path_pairs = list(zip(download_urls, target_fpaths)) successes = asyncio.run(download_files(url__path_pairs)) if sum(successes) != len(successes): print( "Failed to download {} files.".format( len(successes) - sum(successes) ) ) @sync_action def remove(self, registry_df): """ Process the removal action for files specified in the given DataFrame. Args: registry_df (pandas.DataFrame): A data frame subset for records marked for removal. """ for file_path in registry_df["local.json::path"]: try: Path(file_path).unlink() print(f"Removed {file_path}") except FileNotFoundError: print(f"File {file_path} not found, skipping...")
Conclusion
The design of this syncing pipeline embodies a strategic approach to data processing, emphasizing developer efficiency, reproducibility, and system scalability. By treating the pipeline as a cache for each step of the transformation process—from remote document retrieval to the parsing of debates—it allows developers to build and iterate on the pipeline incrementally. This method minimizes redundant work and facilitates easy rebuilding from specific points of truth when necessary. Additionally, the pipeline's structured adherence and the portability of JSON configurations enhance its maintainability, particularly for large-scale projects. This design philosophy ensures that the pipeline not only efficiently manages data transformations but also adapts seamlessly to changes in source data, maintaining its integrity and utility over time.