Class: Whodunit::Chronicles::Adapters::PostgreSQL
Overview
PostgreSQL logical replication adapter
Uses PostgreSQL’s logical replication functionality to stream
database changes via WAL decoding without impacting application performance.
Constant Summary
collapse
- DEFAULT_PLUGIN =
'pgoutput'
Instance Attribute Summary collapse
#logger, #position, #running
Instance Method Summary
collapse
#log, #streaming?
Constructor Details
#initialize(database_url: Chronicles.config.database_url, publication_name: Chronicles.config.publication_name, slot_name: Chronicles.config.replication_slot_name, logger: Chronicles.logger) ⇒ PostgreSQL
Returns a new instance of PostgreSQL.
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 17
def initialize(
database_url: Chronicles.config.database_url,
publication_name: Chronicles.config.publication_name,
slot_name: Chronicles.config.replication_slot_name,
logger: Chronicles.logger
)
super(logger: logger)
@database_url = database_url
@publication_name = publication_name
@slot_name = slot_name
@connection = nil
@replication_connection = nil
@last_lsn = nil
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
15
16
17
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15
def connection
@connection
end
|
#publication_name ⇒ Object
Returns the value of attribute publication_name.
15
16
17
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15
def publication_name
@publication_name
end
|
#replication_connection ⇒ Object
Returns the value of attribute replication_connection.
15
16
17
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15
def replication_connection
@replication_connection
end
|
#slot_name ⇒ Object
Returns the value of attribute slot_name.
15
16
17
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15
def slot_name
@slot_name
end
|
Instance Method Details
#build_copy_statement ⇒ Object
168
169
170
171
172
173
174
175
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 168
def build_copy_statement
options = [
"proto_version '1'",
"publication_names '#{publication_name}'",
].join(', ')
"COPY (SELECT * FROM pg_logical_slot_get_changes('#{slot_name}', NULL, NULL, #{options})) TO STDOUT"
end
|
#close_connections ⇒ Object
137
138
139
140
141
142
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 137
def close_connections
@connection&.close
@replication_connection&.close
@connection = nil
@replication_connection = nil
end
|
#confirmed_flush_lsn ⇒ Object
262
263
264
265
266
267
268
269
270
271
272
273
274
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 262
def confirmed_flush_lsn
sql = 'SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1'
result = @connection.exec_params(sql, [slot_name])
if result.ntuples.positive?
lsn = result.first['confirmed_flush_lsn']
result.clear
lsn
else
result.clear
nil
end
end
|
#create_publication ⇒ Object
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 202
def create_publication
if publication_exists?
log(:info, 'Publication already exists', name: publication_name)
return
end
sql = "CREATE PUBLICATION #{publication_name} FOR ALL TABLES"
@connection.exec(sql)
log(:info, 'Created publication', name: publication_name)
end
|
#create_replication_slot ⇒ Object
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 221
def create_replication_slot
if replication_slot_exists?
log(:info, 'Replication slot already exists', name: slot_name)
return
end
sql = "SELECT pg_create_logical_replication_slot('#{slot_name}', '#{DEFAULT_PLUGIN}')"
result = @connection.exec(sql)
slot_info = result.first
log(:info, 'Created replication slot',
name: slot_name,
lsn: slot_info['lsn'])
ensure
result&.clear
end
|
#current_position ⇒ Object
Get current replication position
64
65
66
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 64
def current_position
@last_lsn || confirmed_flush_lsn
end
|
#drop_publication ⇒ Object
213
214
215
216
217
218
219
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 213
def drop_publication
return unless publication_exists?
sql = "DROP PUBLICATION IF EXISTS #{publication_name}"
@connection.exec(sql)
log(:info, 'Dropped publication', name: publication_name)
end
|
#drop_replication_slot ⇒ Object
238
239
240
241
242
243
244
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 238
def drop_replication_slot
return unless replication_slot_exists?
sql = "SELECT pg_drop_replication_slot('#{slot_name}')"
@connection.exec(sql)
log(:info, 'Dropped replication slot', name: slot_name)
end
|
#ensure_setup ⇒ Object
144
145
146
147
148
149
150
151
152
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 144
def ensure_setup
unless publication_exists?
raise ReplicationError, "Publication '#{publication_name}' does not exist. Run #setup first."
end
return if replication_slot_exists?
raise ReplicationError, "Replication slot '#{slot_name}' does not exist. Run #setup first."
end
|
#establish_connection ⇒ Object
118
119
120
121
122
123
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 118
def establish_connection
return if @connection && !@connection.finished?
@connection = PG.connect(@database_url)
@connection.type_map_for_results = PG::BasicTypeMapForResults.new(@connection)
end
|
#establish_connections ⇒ Object
113
114
115
116
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 113
def establish_connections
establish_connection
establish_replication_connection
end
|
#establish_replication_connection ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 125
def establish_replication_connection
return if @replication_connection && !@replication_connection.finished?
uri = URI.parse(@database_url)
repl_params = URI.decode_www_form(uri.query || '')
repl_params << %w[replication database]
uri.query = URI.encode_www_form(repl_params)
@replication_connection = PG.connect(uri.to_s)
end
|
#parse_logical_message(data) ⇒ Object
191
192
193
194
195
196
197
198
199
200
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 191
def parse_logical_message(data)
lines = data.strip.split("\n")
return unless lines.any?
log(:debug, 'Parsed logical message', lines: lines.size)
nil
end
|
#process_wal_data(data) ⇒ Object
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 177
def process_wal_data(data)
log(:debug, 'Processing WAL data', size: data.bytesize)
change_event = parse_logical_message(data)
yield(change_event) if change_event
rescue StandardError => e
log(:error, 'Error processing WAL data', error: e.message, data: data.inspect)
end
|
#publication_exists? ⇒ Boolean
246
247
248
249
250
251
252
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 246
def publication_exists?
sql = 'SELECT 1 FROM pg_publication WHERE pubname = $1'
result = @connection.exec_params(sql, [publication_name])
exists = result.ntuples.positive?
result.clear
exists
end
|
#replication_slot_exists? ⇒ Boolean
254
255
256
257
258
259
260
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 254
def replication_slot_exists?
sql = 'SELECT 1 FROM pg_replication_slots WHERE slot_name = $1'
result = @connection.exec_params(sql, [slot_name])
exists = result.ntuples.positive?
result.clear
exists
end
|
#setup ⇒ Object
Set up logical replication (publication and slot)
69
70
71
72
73
74
75
76
77
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 69
def setup
log(:info, 'Setting up PostgreSQL logical replication')
establish_connection
create_publication
create_replication_slot
log(:info, 'PostgreSQL setup completed successfully')
end
|
#start_streaming ⇒ Object
Start streaming logical replication changes
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 33
def start_streaming(&)
raise ArgumentError, 'Block required for processing events' unless block_given?
log(:info, 'Starting PostgreSQL logical replication streaming')
establish_connections
ensure_setup
self.running = true
self.position = confirmed_flush_lsn || '0/0'
log(:info, 'Starting replication from LSN', lsn: @position)
begin
stream_changes(&)
rescue StandardError => e
log(:error, 'Streaming error', error: e.message, backtrace: e.backtrace.first(5))
raise ReplicationError, "Failed to stream changes: #{e.message}"
ensure
self.running = false
end
end
|
#stop_streaming ⇒ Object
57
58
59
60
61
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 57
def stop_streaming
log(:info, 'Stopping PostgreSQL logical replication streaming')
self.running = false
close_connections
end
|
#stream_changes ⇒ Object
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 154
def stream_changes(&)
copy_sql = build_copy_statement
log(:debug, 'Starting COPY command', sql: copy_sql)
@replication_connection.exec(copy_sql)
while running?
data = @replication_connection.get_copy_data(async: false)
break unless data
process_wal_data(data, &)
end
end
|
#teardown ⇒ Object
Remove logical replication setup
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 80
def teardown
log(:info, 'Tearing down PostgreSQL logical replication')
establish_connection
drop_replication_slot
drop_publication
log(:info, 'PostgreSQL teardown completed')
ensure
close_connections
end
|
#test_connection ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 93
def test_connection
establish_connection
result = @connection.exec('SELECT current_database(), current_user, version()')
db_info = result.first
log(:info, 'Connection test successful',
database: db_info['current_database'],
user: db_info['current_user'],
version: db_info['version'])
true
rescue PG::Error => e
log(:error, 'Connection test failed', error: e.message)
false
ensure
result&.clear
end
|