Class: Mammoth::DeliveredEnvelopeStore

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/delivered_envelope_store.rb

Overview

SQLite-backed ledger of downstream deliveries.

The PostgreSQL replication boundary is at-least-once. Mammoth therefore keeps a small delivery ledger so a transaction replayed by pgoutput-client after restart does not have to be delivered downstream again.

Constant Summary collapse

SCHEMA =
<<~SQL
  CREATE TABLE IF NOT EXISTS delivered_envelopes (
    id INTEGER PRIMARY KEY,
    idempotency_key TEXT NOT NULL,
    source_name TEXT NOT NULL,
    slot_name TEXT NOT NULL,
    destination_name TEXT NOT NULL,
    delivery_unit TEXT NOT NULL,
    transaction_id TEXT,
    source_position TEXT,
    delivered_at TEXT NOT NULL,

    UNIQUE (idempotency_key)
  );

  CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_source
  ON delivered_envelopes(source_name, slot_name);

  CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_destination
  ON delivered_envelopes(destination_name);

  CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_source_position
  ON delivered_envelopes(source_position);
SQL

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sqlite_store) ⇒ DeliveredEnvelopeStore

Returns a new instance of DeliveredEnvelopeStore.



39
40
41
42
# File 'lib/mammoth/delivered_envelope_store.rb', line 39

def initialize(sqlite_store)
  @sqlite_store = sqlite_store
  ensure_schema!
end

Instance Attribute Details

#sqlite_storeObject (readonly)

Returns the value of attribute sqlite_store.



37
38
39
# File 'lib/mammoth/delivered_envelope_store.rb', line 37

def sqlite_store
  @sqlite_store
end

Instance Method Details

#allObject



78
79
80
# File 'lib/mammoth/delivered_envelope_store.rb', line 78

def all
  database.execute("SELECT * FROM delivered_envelopes ORDER BY id")
end

#countObject



82
83
84
# File 'lib/mammoth/delivered_envelope_store.rb', line 82

def count
  database.get_first_value("SELECT COUNT(*) FROM delivered_envelopes")
end

#delivered?(idempotency_key) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
47
48
49
# File 'lib/mammoth/delivered_envelope_store.rb', line 44

def delivered?(idempotency_key)
  !database.execute(
    "SELECT 1 FROM delivered_envelopes WHERE idempotency_key = ? LIMIT 1",
    [idempotency_key]
  ).empty?
end

#record!(idempotency_key:, source_name:, slot_name:, destination_name:, delivery_unit:, transaction_id:, source_position:) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/mammoth/delivered_envelope_store.rb', line 51

def record!(idempotency_key:, source_name:, slot_name:, destination_name:, delivery_unit:, transaction_id:,
            source_position:)
  database.execute(
    <<~SQL,
      INSERT OR IGNORE INTO delivered_envelopes(
        idempotency_key, source_name, slot_name, destination_name, delivery_unit,
        transaction_id, source_position, delivered_at
      )
      VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    SQL
    [
      idempotency_key,
      source_name,
      slot_name,
      destination_name,
      delivery_unit,
      transaction_id,
      source_position,
      Time.now.utc.iso8601
    ]
  )
  database.get_first_row(
    "SELECT * FROM delivered_envelopes WHERE idempotency_key = ? LIMIT 1",
    [idempotency_key]
  )
end