API Reference

Catalog utilities for the LSST pipeline project.

This module provides the Catalog class for managing Parquet files, including creating, appending, and combining data. It supports efficient handling of large datasets with schema validation and is optimized for memory efficiency through batch processing and temporary file management. The module is designed for use in microlensing simulations, such as those involving LSST DP0 or DP1 data, to store event and photometry data.

Classes

Catalog : Manages Parquet file operations with schema enforcement and batch processing.

class ulens_lsst.catalogs_utils.Catalog(file_path, schema=None)[source]

Bases: object

Class for managing Parquet files with schema enforcement and batch processing.

Provides methods to create, append, combine, and query Parquet files, optimized for memory efficiency through chunked processing and temporary file management. Supports schema validation and is designed for use in parallel processing pipelines.

file_path

Path to the Parquet file.

Type:

str

schema

Schema for the Parquet file.

Type:

pyarrow.Schema, optional

logger

Logger for debugging and logging.

Type:

logging.Logger

create(schema=None, overwrite=False)[source]

Create an empty Parquet file with the specified schema.

Parameters:
  • schema (Dict[str, pyarrow.DataType], optional) – Schema for the Parquet file as a dictionary; uses self.schema if None.

  • overwrite (bool, optional) – Overwrite the file if it exists (default: False).

Raises:
  • ValueError – If schema is not provided and self.schema is None.

  • OSError – If file creation fails.

add_rows(rows, mode='append', schema=None)[source]

Add rows to the Parquet file with thread-safe temporary file handling.

Uses a lock file to prevent concurrent writes in parallel processing, ensuring safe appending to the Parquet file.

Parameters:
  • rows (List[Dict[str, Any]]) – List of dictionaries containing row data.

  • mode (str, optional) – Mode for adding rows: ‘append’ or ‘overwrite’ (default: ‘append’).

  • schema (Union[Dict[str, pyarrow.DataType], pyarrow.Schema], optional) – Schema for the Parquet file; uses self.schema if None.

Raises:
  • ValueError – If mode is invalid, rows are empty, or schema is missing.

  • OSError – If file writing or lock acquisition fails.

combine_parquet_files(temp_dir, schema, batch_size=1000, columns=None, cleanup=True)[source]

Combine temporary Parquet files into a single final Parquet file, handling column order and type mismatches.

Parameters:
  • temp_dir (str) – Directory containing temporary Parquet files.

  • schema (Optional[Union[List[Tuple[str, pa.DataType]], Dict[str, pa.DataType]]]) – Schema for the output file. Required if final_path doesn’t exist or is invalid.

  • batch_size (int, optional) – Number of rows to process at a time (default: 1000).

  • columns (Optional[List[str]], optional) – Columns to read from temporary files (default: None, read all).

  • cleanup (bool, optional) – Delete temporary files after combining (default: True).

Raises:

ValueError – If no valid temporary files are found or schema is required but not provided.

Notes

This method batches data to optimize memory usage, ensures consistency in column order, and handles type mismatches by promoting types (e.g., int32 to int64, float to double) and filling null columns with default values. Designed for use in LSST microlensing experiments with DP0/DP1 data and rubin_sim simulations.

get_max_value(column)[source]

Retrieve the maximum value of a specified column.

Parameters:

column (str) – Column name to query.

Returns:

Maximum value in the column; None if file is empty or column is missing.

Return type:

Union[int, float, None]

get_schema()[source]

Retrieve the schema of the Parquet file.

Returns:

Schema of the Parquet file; None if file does not exist.

Return type:

pyarrow.Schema, optional