When it comes to managing databases, data integrity is crucial. However, in certain situations, such as production database corruption, it's imperative to have a robust backup strategy. This article describes a PostgreSQL script that was developed out of necessity to generate SQL statements for an incremental backup from a specific point in time. This allows applying changes to a previous backup to restore a database to its most recent, accurate state.
Overview of the Script
The script is implemented as a PostgreSQL function named generate_sql_statements. Its primary purpose is to scan specified database tables for changes (new inserts or updates) that occurred after a given timestamp (reference_timestamp). It generates corresponding SQL statements to reflect these changes, allowing the user to recreate the database's state incrementally.
CREATE OR REPLACE FUNCTION generate_sql_statements(table_names TEXT[], reference_timestamp TIMESTAMPTZ)
RETURNS TABLE (sql_statement TEXT) AS $$
DECLARE
input_table_name TEXT;
column_record RECORD;
action_type TEXT;
row_json JSONB;
formatted_value TEXT;
formatted_values TEXT;
columns TEXT;
update_set_clause TEXT;
BEGIN
-- Loop through each table name
FOREACH input_table_name IN ARRAY table_names
LOOP
-- Step 1: Get column information for the current table
FOR column_record IN
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = input_table_name
ORDER BY ordinal_position
LOOP
-- Step 2: Identify rows for INSERT or UPDATE and convert rows to JSON
FOR row_json, action_type IN
EXECUTE format('
SELECT to_jsonb(cf), CASE
WHEN cf."CreatedOn" > $1 THEN ''INSERT''
WHEN cf."UpdatedOn" > $1 AND cf."CreatedOn" <= $1 THEN ''UPDATE''
ELSE NULL
END
FROM %I cf
WHERE cf."CreatedOn" > $1 OR cf."UpdatedOn" > $1', input_table_name)
USING reference_timestamp
LOOP
-- Step 3: Generate formatted column values for SQL statements
formatted_values := '';
columns := '';
update_set_clause := '';
FOR column_record IN
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = input_table_name
ORDER BY ordinal_position
LOOP
formatted_value := CASE
WHEN (row_json -> column_record.column_name) = 'null' THEN 'NULL'
WHEN column_record.data_type IN ('character varying', 'text', 'date', 'timestamp without time zone', 'timestamp with time zone') THEN
'''''' || REPLACE(row_json ->> column_record.column_name, '''', '''''') || ''''''
WHEN column_record.data_type = 'jsonb' THEN
'''''' || REPLACE((row_json -> column_record.column_name)::TEXT, '''', '''''') || '''''::jsonb'
ELSE
row_json ->> column_record.column_name
END;
-- Aggregate column names and values for the INSERT statements
IF columns = '' THEN
columns := quote_ident(column_record.column_name);
formatted_values := formatted_value;
ELSE
columns := columns || ', ' || quote_ident(column_record.column_name);
formatted_values := formatted_values || ', ' || formatted_value;
END IF;
-- Build the update_set_clause, excluding the "Id" column
IF column_record.column_name <> 'Id' THEN
IF update_set_clause = '' THEN
update_set_clause := quote_ident(column_record.column_name) || ' = ' || formatted_value;
ELSE
update_set_clause := update_set_clause || ', ' || quote_ident(column_record.column_name) || ' = ' || formatted_value;
END IF;
END IF;
END LOOP;
-- Step 4: Construct and return the SQL statement based on the action type
IF action_type = 'INSERT' THEN
RETURN QUERY EXECUTE format('
SELECT ''INSERT INTO %I (%s) VALUES (%s);''',
input_table_name,
columns,
formatted_values
);
ELSIF action_type = 'UPDATE' THEN
RETURN QUERY EXECUTE format('
SELECT ''UPDATE %I SET %s WHERE "Id" = %s;''',
input_table_name,
update_set_clause,
row_json ->> 'Id'
);
END IF;
END LOOP;
END LOOP;
END LOOP;
END;
$$ LANGUAGE plpgsql;
How It Works
The script performs the following steps:
Iterate Over the Tables: The script accepts an array of table names (
table_names
) as input. It loops through each table to process rows that have changed (either inserted or updated) since the specified timestamp (reference_timestamp
).Identify Changes (Inserts and Updates): For each table, it queries the rows where either the "CreatedOn" or "UpdatedOn" timestamps are more recent than the
reference_timestamp
. Each identified row is converted to JSON format to facilitate the creation of SQL statements.-
Generate SQL Statements:
-
Inserts: For rows where the "CreatedOn" timestamp is newer than the reference, an
INSERT
statement is generated. -
Updates: For rows where the "UpdatedOn" timestamp is newer but the "CreatedOn" timestamp is older than the reference, an
UPDATE
statement is generated.
-
Inserts: For rows where the "CreatedOn" timestamp is newer than the reference, an
-
Build and Format SQL Statements:
- For each column in the table, the script formats the column values to be compatible with SQL syntax. It handles various data types, including strings, dates, and JSONB.
- For
INSERT
statements, the script aggregates column names and their corresponding values. - For
UPDATE
statements, it builds aSET
clause to update all columns (except the "Id" column) of the specified row.
Return Generated SQL Statements: Depending on whether an action is identified as an
INSERT
orUPDATE
, the corresponding SQL statement is returned to be executed on the backup database.
Detailed Script Breakdown
Here's a step-by-step breakdown of the script:
-
Step 1 - Column Information: For each table, it retrieves column metadata (name, data type, and position) from the
information_schema.columns
. This information is necessary to format the column values correctly in the SQL statements.
FOR column_record IN
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = input_table_name
ORDER BY ordinal_position
LOOP
-
Step 2 - Identify Rows for Changes: Using a dynamic SQL statement, it identifies rows for
INSERT
orUPDATE
. Rows are selected if:- The "CreatedOn" timestamp is newer than the reference timestamp (
INSERT
). - The "UpdatedOn" timestamp is newer, but the "CreatedOn" timestamp is older than the reference (
UPDATE
).
- The "CreatedOn" timestamp is newer than the reference timestamp (
FOR row_json, action_type IN
EXECUTE format('
SELECT to_jsonb(cf), CASE
WHEN cf."CreatedOn" > $1 THEN ''INSERT''
WHEN cf."UpdatedOn" > $1 AND cf."CreatedOn" <= $1 THEN ''UPDATE''
ELSE NULL
END
FROM %I cf
WHERE cf."CreatedOn" > $1 OR cf."UpdatedOn" > $1', input_table_name)
USING reference_timestamp
LOOP
-
Step 3 - Generate Column Values: For each identified row, the script formats the column values to suit their data type. For example:
- Strings are enclosed in single quotes, and any existing single quotes are escaped.
- JSONB data types are cast to JSONB in the output.
It then aggregates these values into a comma-separated list to be used in the INSERT
statements. Similarly, it prepares the SET
clause for UPDATE
statements, excluding the "Id" column.
formatted_values := '';
columns := '';
update_set_clause := '';
FOR column_record IN
SELECT column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = input_table_name
ORDER BY ordinal_position
LOOP
formatted_value := CASE
WHEN (row_json -> column_record.column_name) = 'null' THEN 'NULL'
WHEN column_record.data_type IN ('character varying', 'text', 'date', 'timestamp without time zone', 'timestamp with time zone') THEN
'''''' || REPLACE(row_json ->> column_record.column_name, '''', '''''') || ''''''
WHEN column_record.data_type = 'jsonb' THEN
'''''' || REPLACE((row_json -> column_record.column_name)::TEXT, '''', '''''') || '''''::jsonb'
ELSE
row_json ->> column_record.column_name
END;
-- Aggregate column names and values for the INSERT statements
IF columns = '' THEN
columns := quote_ident(column_record.column_name);
formatted_values := formatted_value;
ELSE
columns := columns || ', ' || quote_ident(column_record.column_name);
formatted_values := formatted_values || ', ' || formatted_value;
END IF;
-- Build the update_set_clause, excluding the "Id" column
IF column_record.column_name <> 'Id' THEN
IF update_set_clause = '' THEN
update_set_clause := quote_ident(column_record.column_name) || ' = ' || formatted_value;
ELSE
update_set_clause := update_set_clause || ', ' || quote_ident(column_record.column_name) || ' = ' || formatted_value;
END IF;
END IF;
END LOOP;
-
Step 4 - Construct SQL Statements:
- For an
INSERT
, it generates an SQL statement to insert the row into the table with the formatted column values. - For an
UPDATE
, it generates an SQL statement to update the row, identified by its "Id" column.
- For an
IF action_type = 'INSERT' THEN
RETURN QUERY EXECUTE format('
SELECT ''INSERT INTO %I (%s) VALUES (%s);''',
input_table_name,
columns,
formatted_values
);
ELSIF action_type = 'UPDATE' THEN
RETURN QUERY EXECUTE format('
SELECT ''UPDATE %I SET %s WHERE "Id" = %s;''',
input_table_name,
update_set_clause,
row_json ->> 'Id'
);
END IF;
Top comments (0)