Setting the Stage for Opioid Data Exploration: Importing and Preparing ARCOS Records

In my current research project, I am delving into the complex world of opioid distribution in the United States by analyzing the ARCOS (Automation of Reports and Consolidated Orders System) database, maintained by the DEA. This database tracks controlled pharmaceuticals from manufacture through commercial distribution to the point of sale, offering a comprehensive view of drug flow. I’m specifically looking at the top two opioid prescriptions in the US: Hydrocodone and Oxycodone.

In this blog post, I’ll share the initial steps taken in setting up the project, focusing on data handling and integration into our analysis tools.

Understanding ARCOS

The ARCOS database is a critical resource for anyone studying controlled substance distribution. It includes detailed transaction records across the U.S., making it invaluable for identifying distribution patterns and potential misuse. The sheer volume and complexity of the data require robust data management strategies to effectively handle and analyze the information. There are over 2 billion records in the dataset, taking over 100 GB of drive space in my PostgreSQL database.

Data Acquisition and Transfer

The first step in working with ARCOS was acquiring the dataset, which is provided in a large TSV (Tab Separated Values) format. I obtained the dataset filtered to Hydrocodone and Oxycodone from The Washington Post. Given the size of the TSV file (105 GB), transferring it efficiently to my SQL server was crucial. For this, I used rsync.

rsync -avzP arcos_all_washpost.tsv username@server:/path/to/destination/arcos_all_washpost.tsv

Data Integration into the Database

Once the TSV file was securely transferred to our SQL server, the next challenge was integrating this data into our PostgreSQL database. For this task, the psql command’s COPY function proved indispensable. This command allows bulk loading of data directly from a file into a PostgreSQL table, which is efficient and significantly faster than inserting records one at a time.

First, I created the table in Postgres:

CREATE TABLE IF NOT EXISTS pain_pills.arcos_all_washpost
(
    id integer NOT NULL DEFAULT nextval('pain_pills.arcos_all_washpost_id_seq'::regclass),
    "REPORTER_DEA_NO" character varying COLLATE pg_catalog."default",
    "REPORTER_BUS_ACT" character varying COLLATE pg_catalog."default",
    "REPORTER_NAME" character varying COLLATE pg_catalog."default",
    "REPORTER_ADDL_CO_INFO" character varying COLLATE pg_catalog."default",
    "REPORTER_ADDRESS1" character varying COLLATE pg_catalog."default",
    "REPORTER_ADDRESS2" character varying COLLATE pg_catalog."default",
    "REPORTER_CITY" character varying COLLATE pg_catalog."default",
    "REPORTER_STATE" character varying COLLATE pg_catalog."default",
    "REPORTER_ZIP" character varying COLLATE pg_catalog."default",
    "REPORTER_COUNTY" character varying COLLATE pg_catalog."default",
    "BUYER_DEA_NO" character varying COLLATE pg_catalog."default",
    "BUYER_BUS_ACT" character varying COLLATE pg_catalog."default",
    "BUYER_NAME" character varying COLLATE pg_catalog."default",
    "BUYER_ADDL_CO_INFO" character varying COLLATE pg_catalog."default",
    "BUYER_ADDRESS1" character varying COLLATE pg_catalog."default",
    "BUYER_ADDRESS2" character varying COLLATE pg_catalog."default",
    "BUYER_CITY" character varying COLLATE pg_catalog."default",
    "BUYER_STATE" character varying COLLATE pg_catalog."default",
    "BUYER_ZIP" character varying COLLATE pg_catalog."default",
    "BUYER_COUNTY" character varying COLLATE pg_catalog."default",
    "TRANSACTION_CODE" character varying COLLATE pg_catalog."default",
    "DRUG_CODE" character varying COLLATE pg_catalog."default",
    "NDC_NO" character varying COLLATE pg_catalog."default",
    "DRUG_NAME" character varying COLLATE pg_catalog."default",
    "Measure" character varying COLLATE pg_catalog."default",
    "MME_Conversion_Factor" double precision,
    "Dosage_Strength" double precision,
    "TRANSACTION_DATE" date,
    "Combined_Labeler_Name" character varying COLLATE pg_catalog."default",
    "Reporter_family" character varying COLLATE pg_catalog."default",
    "CALC_BASE_WT_IN_GM" double precision,
    "DOSAGE_UNIT" double precision,
    "MME" double precision,
    CONSTRAINT arcos_all_washpost_pkey PRIMARY KEY (id)
)

Then, I ran psql to log into the Postgres database:

psql -u <username> -d <database>

You’ll then be asked for your database password. Then I ran the copy command to copy the file from the location you uploaded the file to via rsync:

COPY <schema>.arcos_all_washpost (
    "REPORTER_DEA_NO", "REPORTER_BUS_ACT", "REPORTER_NAME", "REPORTER_ADDL_CO_INFO", "REPORTER_ADDRESS1",
    "REPORTER_ADDRESS2", "REPORTER_CITY", "REPORTER_STATE", "REPORTER_ZIP", "REPORTER_COUNTY",
    "BUYER_DEA_NO", "BUYER_BUS_ACT", "BUYER_NAME", "BUYER_ADDL_CO_INFO", "BUYER_ADDRESS1",
    "BUYER_ADDRESS2", "BUYER_CITY", "BUYER_STATE", "BUYER_ZIP", "BUYER_COUNTY",
    "TRANSACTION_CODE", "DRUG_CODE", "NDC_NO", "DRUG_NAME", "Measure",
    "MME_Conversion_Factor", "Dosage_Strength", "TRANSACTION_DATE", "Combined_Labeler_Name",
    "Reporter_family", "CALC_BASE_WT_IN_GM", "DOSAGE_UNIT", "MME"
)
FROM '<file_directory>/arcos_all_washpost.tsv'
WITH (FORMAT csv, DELIMITER E'\t', HEADER);

I ran this overnight as I expected it to take quite a long time.

Geocoding and Data Preparation

With the data in the database, the next phase involved geocoding—the process of converting addresses into geographic coordinates. This step is crucial for any spatial analysis, which will allow us to visualize distribution patterns and conduct geographic-based analysis. Utilizing PostGIS, an extension for PostgreSQL that supports geographic objects, I started processing the addresses from the ARCOS data.

I created a Postgres function to do the geocoding. But first, you must enable the dblink extension to allow for committing to the table within a function:

CREATE EXTENSION dblink;

Then, create the buyer_addresses table to store the addresses and geocode results:

CREATE TABLE IF NOT EXISTS pain_pills.buyer_addresses
(
    id integer NOT NULL,
    buyer_dea_no text COLLATE pg_catalog."default",
    address text COLLATE pg_catalog."default",
    city character varying COLLATE pg_catalog."default",
    state character varying COLLATE pg_catalog."default",
    zip character varying COLLATE pg_catalog."default",
    latitude double precision,
    longitude double precision,
    geocode_rating double precision,
    geom geometry(Geometry,4269),
    CONSTRAINT buyer_addresses_pkey PRIMARY KEY (id)
)

Next, we create the function:

CREATE OR REPLACE FUNCTION update_geocoded_addresses()
RETURNS void AS $$
DECLARE
    r record;
    geo_result record;
    count integer := 0;  -- Total updates attempted
    commit_count integer := 0;  -- Number of committed updates
    total_records integer := 0;  -- Total records to be processed
    conn text;
BEGIN
    -- Establish a connection string, adjust credentials as necessary
    conn := 'dbname=ross_projects user=rwardrup password=Rward0232';
    -- First, count the total records needing update
    SELECT COUNT(*) INTO total_records
    FROM pain_pills.buyer_addresses
    WHERE geom IS NULL;
    -- Open a cursor for records needing updates
    FOR r IN 
        SELECT id, CONCAT(address, ', ', city, ', ', state, ', ', zip) AS full_address
        FROM pain_pills.buyer_addresses
        WHERE geom IS NULL
    LOOP
        -- Perform geocoding
        geo_result := geocode(r.full_address, 1);
        -- Check if geocode result is not null and update
        IF geo_result IS NOT NULL THEN
            PERFORM dblink_connect(conn);
            PERFORM dblink_exec(format('UPDATE pain_pills.buyer_addresses SET latitude = %s, longitude = %s, geom = %L, geocode_rating = %s WHERE id = %s',
                                        ST_Y(geo_result.geomout),
                                        ST_X(geo_result.geomout),
                                        geo_result.geomout,
                                        geo_result.rating,
                                        r.id));
            PERFORM dblink_disconnect();
            count := count + 1;
            -- Give notice every 25 geocodes
            IF count % 25 = 0 THEN
                RAISE NOTICE 'Processed 25 geocodes, last updated ID: %', r.id;
            END IF;
            -- Commit and give notice every 500 updates
            IF count % 500 = 0 THEN
                PERFORM dblink_connect(conn);
                PERFORM dblink_exec('COMMIT;');
                PERFORM dblink_disconnect();
                commit_count := commit_count + 500;
                RAISE NOTICE 'Committed 500 updates, total committed: % out of %', commit_count, total_records;
            END IF;
        ELSE
            RAISE NOTICE 'Failed to geocode ID %: %', r.id, r.full_address;
        END IF;
    END LOOP;
    -- Ensure any remaining transactions are committed
    IF count % 500 != 0 THEN
        PERFORM dblink_connect(conn);
        PERFORM dblink_exec('COMMIT;');
        PERFORM dblink_disconnect();
        commit_count := commit_count + (count % 500);
        RAISE NOTICE 'Final commit for remaining updates: %, total committed: % out of %', (count % 500), commit_count, total_records;
    END IF;
END;
$$ LANGUAGE plpgsql;

Finally, we run it:

SELECT update_geocoded_addresses();

I calculated this to take around 3.5 days on my server, but my SQL database is stored on spinning rust drives. If you are using SSDs, you can expect it to be faster.

Looking Forward

The next steps involve completing the geocoding process, analyzing the geographically aggregated data, and beginning an in-depth analysis of opioid distribution patterns.

Similar Posts

Leave a Reply