Class: Whodunit::Chronicles::Adapters::PostgreSQL

Inherits:
StreamAdapter
  • Object
show all
Defined in:
lib/whodunit/chronicles/adapters/postgresql.rb

Overview

PostgreSQL logical replication adapter

Uses PostgreSQL’s logical replication functionality to stream database changes via WAL decoding without impacting application performance.

Constant Summary collapse

DEFAULT_PLUGIN =
'pgoutput'

Instance Attribute Summary collapse

Attributes inherited from StreamAdapter

#logger, #position, #running

Instance Method Summary collapse

Methods inherited from StreamAdapter

#log, #streaming?

Constructor Details

#initialize(database_url: Chronicles.config.database_url, publication_name: Chronicles.config.publication_name, slot_name: Chronicles.config.replication_slot_name, logger: Chronicles.logger) ⇒ PostgreSQL

Returns a new instance of PostgreSQL.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 17

def initialize(
  database_url: Chronicles.config.database_url,
  publication_name: Chronicles.config.publication_name,
  slot_name: Chronicles.config.replication_slot_name,
  logger: Chronicles.logger
)
  super(logger: logger)
  @database_url = database_url
  @publication_name = publication_name
  @slot_name = slot_name
  @connection = nil
  @replication_connection = nil
  @last_lsn = nil
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



15
16
17
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15

def connection
  @connection
end

#publication_nameObject (readonly)

Returns the value of attribute publication_name.



15
16
17
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15

def publication_name
  @publication_name
end

#replication_connectionObject (readonly)

Returns the value of attribute replication_connection.



15
16
17
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15

def replication_connection
  @replication_connection
end

#slot_nameObject (readonly)

Returns the value of attribute slot_name.



15
16
17
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 15

def slot_name
  @slot_name
end

Instance Method Details

#build_copy_statementObject (private)



168
169
170
171
172
173
174
175
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 168

def build_copy_statement
  options = [
    "proto_version '1'",
    "publication_names '#{publication_name}'",
  ].join(', ')

  "COPY (SELECT * FROM pg_logical_slot_get_changes('#{slot_name}', NULL, NULL, #{options})) TO STDOUT"
end

#close_connectionsObject (private)



137
138
139
140
141
142
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 137

def close_connections
  @connection&.close
  @replication_connection&.close
  @connection = nil
  @replication_connection = nil
end

#confirmed_flush_lsnObject (private)



262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 262

def confirmed_flush_lsn
  sql = 'SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1'
  result = @connection.exec_params(sql, [slot_name])

  if result.ntuples.positive?
    lsn = result.first['confirmed_flush_lsn']
    result.clear
    lsn
  else
    result.clear
    nil
  end
end

#create_publicationObject (private)



202
203
204
205
206
207
208
209
210
211
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 202

def create_publication
  if publication_exists?
    log(:info, 'Publication already exists', name: publication_name)
    return
  end

  sql = "CREATE PUBLICATION #{publication_name} FOR ALL TABLES"
  @connection.exec(sql)
  log(:info, 'Created publication', name: publication_name)
end

#create_replication_slotObject (private)



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 221

def create_replication_slot
  if replication_slot_exists?
    log(:info, 'Replication slot already exists', name: slot_name)
    return
  end

  sql = "SELECT pg_create_logical_replication_slot('#{slot_name}', '#{DEFAULT_PLUGIN}')"
  result = @connection.exec(sql)
  slot_info = result.first

  log(:info, 'Created replication slot',
    name: slot_name,
    lsn: slot_info['lsn'])
ensure
  result&.clear
end

#current_positionObject

Get current replication position



64
65
66
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 64

def current_position
  @last_lsn || confirmed_flush_lsn
end

#drop_publicationObject (private)



213
214
215
216
217
218
219
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 213

def drop_publication
  return unless publication_exists?

  sql = "DROP PUBLICATION IF EXISTS #{publication_name}"
  @connection.exec(sql)
  log(:info, 'Dropped publication', name: publication_name)
end

#drop_replication_slotObject (private)



238
239
240
241
242
243
244
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 238

def drop_replication_slot
  return unless replication_slot_exists?

  sql = "SELECT pg_drop_replication_slot('#{slot_name}')"
  @connection.exec(sql)
  log(:info, 'Dropped replication slot', name: slot_name)
end

#ensure_setupObject (private)

Raises:



144
145
146
147
148
149
150
151
152
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 144

def ensure_setup
  unless publication_exists?
    raise ReplicationError, "Publication '#{publication_name}' does not exist. Run #setup first."
  end

  return if replication_slot_exists?

  raise ReplicationError, "Replication slot '#{slot_name}' does not exist. Run #setup first."
end

#establish_connectionObject (private)



118
119
120
121
122
123
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 118

def establish_connection
  return if @connection && !@connection.finished?

  @connection = PG.connect(@database_url)
  @connection.type_map_for_results = PG::BasicTypeMapForResults.new(@connection)
end

#establish_connectionsObject (private)



113
114
115
116
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 113

def establish_connections
  establish_connection
  establish_replication_connection
end

#establish_replication_connectionObject (private)



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 125

def establish_replication_connection
  return if @replication_connection && !@replication_connection.finished?

  # Parse connection URL and add replication parameter
  uri = URI.parse(@database_url)
  repl_params = URI.decode_www_form(uri.query || '')
  repl_params << %w[replication database]
  uri.query = URI.encode_www_form(repl_params)

  @replication_connection = PG.connect(uri.to_s)
end

#parse_logical_message(data) ⇒ Object (private)



191
192
193
194
195
196
197
198
199
200
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 191

def parse_logical_message(data)
  # Simplified parser for demonstration
  # Real implementation would parse pgoutput binary protocol
  lines = data.strip.split("\n")
  return unless lines.any?

  # This is a placeholder - would need full pgoutput protocol parsing
  log(:debug, 'Parsed logical message', lines: lines.size)
  nil
end

#process_wal_data(data) ⇒ Object (private)



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 177

def process_wal_data(data)
  # Parse pgoutput protocol message
  # This is a simplified version - full implementation would need
  # to properly decode the binary protocol
  log(:debug, 'Processing WAL data', size: data.bytesize)

  # For now, we'll parse text-based logical decoding output
  # In production, this should parse the binary pgoutput format
  change_event = parse_logical_message(data)
  yield(change_event) if change_event
rescue StandardError => e
  log(:error, 'Error processing WAL data', error: e.message, data: data.inspect)
end

#publication_exists?Boolean (private)

Returns:

  • (Boolean)


246
247
248
249
250
251
252
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 246

def publication_exists?
  sql = 'SELECT 1 FROM pg_publication WHERE pubname = $1'
  result = @connection.exec_params(sql, [publication_name])
  exists = result.ntuples.positive?
  result.clear
  exists
end

#replication_slot_exists?Boolean (private)

Returns:

  • (Boolean)


254
255
256
257
258
259
260
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 254

def replication_slot_exists?
  sql = 'SELECT 1 FROM pg_replication_slots WHERE slot_name = $1'
  result = @connection.exec_params(sql, [slot_name])
  exists = result.ntuples.positive?
  result.clear
  exists
end

#setupObject

Set up logical replication (publication and slot)



69
70
71
72
73
74
75
76
77
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 69

def setup
  log(:info, 'Setting up PostgreSQL logical replication')

  establish_connection
  create_publication
  create_replication_slot

  log(:info, 'PostgreSQL setup completed successfully')
end

#start_streamingObject

Start streaming logical replication changes

Raises:

  • (ArgumentError)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 33

def start_streaming(&)
  raise ArgumentError, 'Block required for processing events' unless block_given?

  log(:info, 'Starting PostgreSQL logical replication streaming')

  establish_connections
  ensure_setup

  self.running = true
  self.position = confirmed_flush_lsn || '0/0'

  log(:info, 'Starting replication from LSN', lsn: @position)

  begin
    stream_changes(&)
  rescue StandardError => e
    log(:error, 'Streaming error', error: e.message, backtrace: e.backtrace.first(5))
    raise ReplicationError, "Failed to stream changes: #{e.message}"
  ensure
    self.running = false
  end
end

#stop_streamingObject

Stop streaming



57
58
59
60
61
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 57

def stop_streaming
  log(:info, 'Stopping PostgreSQL logical replication streaming')
  self.running = false
  close_connections
end

#stream_changesObject (private)



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 154

def stream_changes(&)
  copy_sql = build_copy_statement
  log(:debug, 'Starting COPY command', sql: copy_sql)

  @replication_connection.exec(copy_sql)

  while running?
    data = @replication_connection.get_copy_data(async: false)
    break unless data

    process_wal_data(data, &)
  end
end

#teardownObject

Remove logical replication setup



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 80

def teardown
  log(:info, 'Tearing down PostgreSQL logical replication')

  establish_connection
  drop_replication_slot
  drop_publication

  log(:info, 'PostgreSQL teardown completed')
ensure
  close_connections
end

#test_connectionObject

Test database connection



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/whodunit/chronicles/adapters/postgresql.rb', line 93

def test_connection
  establish_connection
  result = @connection.exec('SELECT current_database(), current_user, version()')
  db_info = result.first

  log(:info, 'Connection test successful',
    database: db_info['current_database'],
    user: db_info['current_user'],
    version: db_info['version'])

  true
rescue PG::Error => e
  log(:error, 'Connection test failed', error: e.message)
  false
ensure
  result&.clear
end