kafka_fdw
kafka_fdw : kafka Foreign Data Wrapper for CSV formatted messages
Overview
| ID | Extension | Package | Version | Category | License | Language |
|---|---|---|---|---|---|---|
| 8730 | kafka_fdw
|
kafka_fdw
|
0.0.3 |
FDW
|
PostgreSQL
|
C
|
| Attribute | Has Binary | Has Library | Need Load | Has DDL | Relocatable | Trusted |
|---|---|---|---|---|---|---|
--s-d-r
|
No
|
Yes
|
No
|
Yes
|
yes
|
no
|
| Relationships | |
|---|---|
| See Also | pgmq
mongo_fdw
redis_fdw
wrappers
multicorn
redis
hdfs_fdw
wal2json
|
Packages
| Type | Repo | Version | PG Major Compatibility | Package Pattern | Dependencies |
|---|---|---|---|---|---|
| EXT | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
kafka_fdw |
- |
| RPM | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
kafka_fdw_$v |
- |
| DEB | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
postgresql-$v-kafka-fdw |
- |
| Linux / PG | PG18 | PG17 | PG16 | PG15 | PG14 |
|---|---|---|---|---|---|
el8.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el8.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el9.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el9.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el10.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el10.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d12.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d12.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d13.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d13.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u22.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u22.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u24.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u24.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
Source
pig build pkg kafka_fdw; # build rpm/debInstall
Make sure PGDG and PIGSTY repo available:
pig repo add pgsql -u # add both repo and update cacheInstall this extension with pig:
pig install kafka_fdw; # install via package name, for the active PG version
pig install kafka_fdw -v 18; # install for PG 18
pig install kafka_fdw -v 17; # install for PG 17
pig install kafka_fdw -v 16; # install for PG 16
pig install kafka_fdw -v 15; # install for PG 15
pig install kafka_fdw -v 14; # install for PG 14Create this extension with:
CREATE EXTENSION kafka_fdw;Usage
kafka_fdw: Kafka Foreign Data Wrapper for CSV formatted messages
Create Server
CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');Server Options: brokers (required, comma-separated Kafka broker endpoints).
Create User Mapping
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;Create Foreign Table (CSV Format)
CREATE FOREIGN TABLE kafka_csv (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');Two metadata columns are required: one with partition 'true' and one with offset 'true'. The remaining columns match the message format.
Table Options: format (csv or json), topic (Kafka topic name), batch_size, buffer_delay (milliseconds), strict (enforce strict schema validation), ignore_junk (set malformed columns to NULL).
Create Foreign Table (JSON Format)
CREATE FOREIGN TABLE kafka_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int OPTIONS (json 'int_val'),
some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');Use the json column option to map column names to JSON keys.
Consuming Messages
-- Read from a specific partition and offset
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;
-- Read from multiple partitions
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);Note: The offset keyword is reserved in SQL; use double quotes when referencing the offset column in some contexts.
Producing Messages
-- Insert with explicit partition
INSERT INTO kafka_csv (part, some_int, some_text)
VALUES (0, 42, 'hello from partition 0');
-- Insert with auto-partition selection
INSERT INTO kafka_csv (some_int, some_text)
VALUES (42, 'auto-partitioned message');