Multithreading and Multiprocessing

This section outlines how to best take advantage of multithreading and multiprocessing when using hictkpy.

TLDR

  • Never share file handles, pixel selectors, or iterators between Python threads or processes.

  • Using multithreading when fetching interactions from Cooler files rarely yields performance improvements. This is due to limitations of the HDF5 library itself.

  • When using multiprocessing on Linux, avoid using fork as the method to start new processes. When using forkserver, avoid pre-loading the hictkpy library with multiprocessing.set_forkserver_preload().

  • Free-threaded Python is supported, in the sense that it is possible to build a wheel targeting free-threaded builds, but the resulting wheels still rely on the GIL.

Multithreading

Generally speaking, multithreaded code in Python cannot be used to improve performance for compute-bound operations. This is due to the Python GIL. However, the core of hictkpy is written in C++ and interacts with Python through the C API, which allows us to release the GIL on most long-running operations, such as fetching pixels from Cooler or .hic files. This allows applications to achieve concurrency through multithreading.

Furthermore, certain operations, such as creating .hic files, can natively take advantage of multicore CPUs by using multithreading. These threads are C++ threads and are completely independent from Python and the GIL.

It should be noted that within-process concurrency is limited when processing Cooler files, as HDF5, the C library used for low-level IO, makes heavy use of global state that is either not thread-safe or is protected by a global mutex.

Multiprocessing

In a multiprocessing environment, hictkpy behaves like any other Python library. The only area requiring special attention is logging. Logging is not supported when using fork as the method to start new processes on Linux, and will result in a UserWarning being raised.

/usr/lib64/python3.14/multiprocessing/popen_fork.py:70: UserWarning: hictkpy: detected a call to fork():
hictkpy's logger does not support multiprocessing when using fork() as start method.
Please change process start method to spawn or forkserver.
For more details, refer to Python's documentation:
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_start_method
  self.pid = os.fork()

When using multiprocessing, processing Cooler files is not affected by the limitations described in the previous section.

Example (multithreading)

This example shows how to correctly use hictkpy.File() using concurrent.futures.ThreadPoolExecutor:

import logging
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import hictkpy
import pandas as pd


def fetch_chroms(path):
    """
    Get the list of chromosomes available in the given file
    """
    with hictkpy.MultiResFile(path) as f:
        return list(f.chromosomes().keys())


def fetch_pixels(path, resolution, query):
    """
    Fetch interactions for the given query and return them as a pandas.DataFrame
    """
    with hictkpy.File(path, resolution) as f:
        logging.info("[%s; TID=%d]: fetching...", query, threading.get_native_id())
        df = f.fetch(query, join=True).to_df()
        logging.info("[%s; TID=%d]: fetched %d interactions!", query, threading.get_native_id(), len(df))

        return df


def fetch_cis_interactions(path, resolution, nthreads):
    """
    Fetch cis interactions from the given file and return them as a pandas.DataFrame
    """
    chroms = fetch_chroms(path)

    with ThreadPoolExecutor(nthreads) as tpool:
        tasks = []
        for chrom in chroms:
            tasks.append(tpool.submit(fetch_pixels, path, resolution, chrom))

        results = (task.result() for task in tasks)

        return pd.concat((df for df in results if len(df) != 0))


def setup_logger(level=logging.INFO):
    fmt = "[%(asctime)s] %(levelname)s: %(message)s"
    logging.basicConfig(format=fmt)
    logging.getLogger().setLevel(level)

    # suppress log messages generated by hictkpy for level INFO or lower
    hictkpy.logging.setLevel(logging.WARN)


def main():
    setup_logger()

    path = "test/data/hic_test_file.hic"
    resolution = 100_000

    t0 = time.time()
    df = fetch_cis_interactions(path, resolution, nthreads=os.cpu_count())
    print(df, file=sys.stderr)

    logging.info("fetched %d interactions in %.2fs!", len(df), time.time() - t0)


if __name__ == "__main__":
    main()

Example (multiprocessing)

Using hictkpy.File() handles with using concurrent.futures.ProcessPoolExecutor is almost identical to the previous example.

  • Explicitly set the process start method to something other than fork (only required on Linux).

  • Initialize the logger in each child process by passing initializer=setup_logger to concurrent.futures.ProcessPoolExecutor.

import logging
import multiprocessing as mp
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor

import hictkpy
import pandas as pd


def fetch_chroms(path):
    """
    Get the list of chromosomes available in the given file
    """
    with hictkpy.MultiResFile(path) as f:
        return list(f.chromosomes().keys())


def fetch_pixels(path, resolution, query):
    """
    Fetch interactions for the given query and return them as a pandas.DataFrame
    """
    with hictkpy.File(path, resolution) as f:
        logging.info("[%s; PID=%d]: fetching...", query, os.getpid())
        df = f.fetch(query, join=True).to_df()
        logging.info("[%s; PID=%d]: fetched %d interactions!", query, os.getpid(), len(df))

        return df


def fetch_cis_interactions(path, resolution, nthreads):
    """
    Fetch cis interactions from the given file and return them as a pandas.DataFrame
    """
    chroms = fetch_chroms(path)

    with ProcessPoolExecutor(nthreads, initializer=setup_logger) as ppool:
        tasks = []
        for chrom in chroms:
            tasks.append(ppool.submit(fetch_pixels, path, resolution, chrom))

        results = (task.result() for task in tasks)

        return pd.concat((df for df in results if len(df) != 0))


def setup_logger(level=logging.INFO):
    fmt = "[%(asctime)s] %(levelname)s: %(message)s"
    logging.basicConfig(format=fmt)
    logging.getLogger().setLevel(level)

    # suppress log messages generated by hictkpy for level INFO or lower
    hictkpy.logging.setLevel(logging.WARN)


def main():
    mp.set_start_method("spawn")
    setup_logger()

    path = "test/data/hic_test_file.hic"
    resolution = 100_000

    t0 = time.time()
    df = fetch_cis_interactions(path, resolution, nthreads=os.cpu_count())
    print(df, file=sys.stderr)

    logging.info("fetched %d interactions in %.2fs!", len(df), time.time() - t0)


if __name__ == "__main__":
    main()