pg_incremental
pg_incremental : Incremental Processing by Crunchy Data
Overview
| ID | Extension | Package | Version | Category | License | Language |
|---|---|---|---|---|---|---|
| 2850 | pg_incremental
|
pg_incremental
|
1.4.1 |
FEAT
|
PostgreSQL
|
C
|
| Attribute | Has Binary | Has Library | Need Load | Has DDL | Relocatable | Trusted |
|---|---|---|---|---|---|---|
--s-d--
|
No
|
Yes
|
No
|
Yes
|
no
|
no
|
| Relationships | |
|---|---|
| Schemas | pg_catalog |
| Requires | pg_cron
|
| See Also | age
hll
rum
pg_graphql
pg_jsonschema
jsquery
pg_hint_plan
|
Packages
| Type | Repo | Version | PG Major Compatibility | Package Pattern | Dependencies |
|---|---|---|---|---|---|
| EXT | PIGSTY
|
1.4.1 |
18
17
16
15
14
|
pg_incremental |
pg_cron |
| RPM | PIGSTY
|
1.4.1 |
18
17
16
15
14
|
pg_incremental_$v |
pg_cron_$v |
| DEB | PIGSTY
|
1.4.1 |
18
17
16
15
14
|
postgresql-$v-pg-incremental |
postgresql-$v-cron |
| Linux / PG | PG18 | PG17 | PG16 | PG15 | PG14 |
|---|---|---|---|---|---|
el8.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
el8.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
el9.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
el9.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
el10.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
el10.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
d12.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
d12.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
d13.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
d13.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
u22.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
u22.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
u24.x86_64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
u24.aarch64
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
PIGSTY 1.4.1
|
MISS
|
MISS
|
Source
pig build pkg pg_incremental; # build rpm/debInstall
Make sure PGDG and PIGSTY repo available:
pig repo add pgsql -u # add both repo and update cacheInstall this extension with pig:
pig install pg_incremental; # install via package name, for the active PG version
pig install pg_incremental -v 18; # install for PG 18
pig install pg_incremental -v 17; # install for PG 17
pig install pg_incremental -v 16; # install for PG 16Create this extension with:
CREATE EXTENSION pg_incremental CASCADE; -- requires pg_cronUsage
The pg_incremental extension provides fast, reliable incremental batch processing pipelines in PostgreSQL. It defines parameterized queries that execute periodically for new data, ensuring exactly-once delivery.
CREATE EXTENSION pg_incremental CASCADE; -- depends on pg_cronPipeline Types
There are three types of pipelines:
- Sequence pipelines – Process ranges of sequence values from a table
- Time interval pipelines – Process time ranges after intervals pass
- File list pipelines – Process new files from a file listing function
Sequence Pipeline
Create a pipeline that incrementally aggregates new rows using a sequence:
SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
INSERT INTO events_agg
SELECT date_trunc('day', event_time), count(*)
FROM events
WHERE event_id BETWEEN $1 AND $2
GROUP BY 1
ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count
$$);$1 and $2 are set to the lowest and highest sequence values that can be safely processed.
With batch size limiting:
SELECT incremental.create_sequence_pipeline(
'event-aggregation', 'events',
$$ ... $$,
schedule := '* * * * *',
max_batch_size := 10000
);Time Interval Pipeline
Process data in fixed time intervals:
SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
INSERT INTO events_agg
SELECT event_time::date, count(distinct event_id)
FROM events
WHERE event_time >= $1 AND event_time < $2
GROUP BY 1
$$);$1 and $2 are set to the start and end (exclusive) of the time range.
For per-interval execution (e.g., daily exports):
SELECT incremental.create_time_interval_pipeline('event-export',
time_interval := '1 day',
batched := false,
start_time := '2024-11-01',
command := $$ SELECT export_events($1, $2) $$
);File List Pipeline
Process new files as they appear:
SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
SELECT import_events($1)
$$);Management Functions
| Function | Description |
|---|---|
incremental.execute_pipeline(name) |
Manually execute a pipeline (only if new data exists) |
incremental.reset_pipeline(name) |
Reset pipeline to reprocess from the beginning |
incremental.drop_pipeline(name) |
Remove a pipeline |
incremental.skip_file(pipeline, path) |
Skip a faulty file in a file list pipeline |
Monitoring
SELECT * FROM incremental.sequence_pipelines;
SELECT * FROM incremental.time_interval_pipelines;
SELECT * FROM incremental.processed_files;Check job outcomes via pg_cron:
SELECT jobname, start_time, status, return_message
FROM cron.job_run_details JOIN cron.job USING (jobid)
WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;