Skip to content

Database

SQLite-based persistence layer for EVE Analytics.

This class is responsible for: - Creating and managing a local SQLite database - Inserting parsed combat log data into structured tables - Managing match metadata and combat event records - Downloading and loading EVE Static Data Export (SDE) - Transforming parsed logs into normalized database entries

It acts as the bridge between in-memory parsed log data and long-term structured storage.

Attributes:

Name Type Description
ea

Main EVE Analytics application context (provides parsed logs).

db_path Path

Path to SQLite database file.

base Path

Base directory for local storage.

conn Connection

Active SQLite connection.

cursor Cursor

Database cursor for queries.

sde_location Path

Extracted SDE directory path.

create_tables list[str]

SQL schema creation statements.

Source code in src/eve_analytics/classes/db.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
class Database:
    """
    SQLite-based persistence layer for EVE Analytics.

    This class is responsible for:
    - Creating and managing a local SQLite database
    - Inserting parsed combat log data into structured tables
    - Managing match metadata and combat event records
    - Downloading and loading EVE Static Data Export (SDE)
    - Transforming parsed logs into normalized database entries

    It acts as the bridge between in-memory parsed log data
    and long-term structured storage.

    Attributes:
        ea: Main EVE Analytics application context (provides parsed logs).
        db_path (Path): Path to SQLite database file.
        base (Path): Base directory for local storage.
        conn (sqlite3.Connection): Active SQLite connection.
        cursor (sqlite3.Cursor): Database cursor for queries.
        sde_location (Path): Extracted SDE directory path.
        create_tables (list[str]): SQL schema creation statements.
    """
    def __init__(self, ea, db_path=None):
        """
        Initializes the database connection and builds schema.

        If no database path is provided, a default local directory
        (~/.eveanalytics) is created and used.

        Workflow:
        - Creates SQLite connection
        - Enables foreign key support
        - Builds schema tables
        - Prepares cursor for queries

        Args:
            ea: EVE Analytics application instance containing parsed logs.
            db_path (Path | None): Optional custom database file path.
        """
        self.ea = ea
        self.sde_location = None
        if db_path is None:
            self.base = Path.home() / ".eveanalytics"
            self.base.mkdir(exist_ok=True)
            self.db_path = self.base / "combat_logs.db"
        else:
            self.db_path = db_path

        self.conn = sqlite3.connect(self.db_path)
        self.conn.execute("PRAGMA foreign_keys = ON;")
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()

        self.create_tables = [
            create_combat_data_sql,
            create_combat_log_types_sql,
            create_matches_sql,
            create_pilot_bridge_sql,
            create_users_sql,
            create_invgroups_sql,
            create_dmgtypeattribs_sql,
            create_invcategories_sql,
            create_invtypes_sql
        ]
        self._build_schema()

    def __repr__(self) -> str:
        """
        Returns a string representation of the database instance.

        Returns:
            str: Database path summary.
        """

        return f"<Database> | {self.db_path}"

    def default_db_load(self):
        """
        Executes the full default ingestion pipeline.

        Steps:
        - Inserts match metadata
        - Inserts parsed combat logs
        - Loads EVE SDE data into database tables
        """
        self.__insert_matches()
        self.__insert_combat_logs()
        self.__load_eve_data()

    def __insert_matches(self):
        """
        Inserts parsed match metadata into the database.

        Each match includes:
        - Match ID (hashed)
        - Start/end timestamps
        - Countdown start timestamp
        - Description and metadata flags
        """
        now = datetime.now(timezone.utc).isoformat()
        sql = """
            INSERT OR IGNORE INTO matches (
                id, description, countdown_start_ts, 
                match_start_ts, match_end_ts,
                create_ts, update_ts, retired
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """
        values = [
            (
                r.get("id", 0),
                r.get('description', ""),
                r.get('countdown'),
                r.get('start'),
                r.get('end'),
                r.get("create_ts", now),
                r.get("update_ts", now),
                int(r.get("retired", False))
            )
            for r in self.ea.parsed_logs.matches if r
        ]

        self.cursor.executemany(sql, values)
        self.conn.commit()

    def __insert_combat_logs(self):
        """
        Inserts all parsed combat log events into the combat_data table.

        This method:
        - Flattens all event categories (damage, reps, neuts, etc.)
        - Normalizes each record via process_json_record()
        - Assigns match IDs and log type IDs
        - Bulk inserts into SQLite
        """
        now = datetime.now(timezone.utc).isoformat()
        sql = """
              INSERT OR IGNORE INTO combat_data (
                  id, match_id, pilot, action_timestamp,
                  amount, action_to, action_from, direction,
                  module, log_type_id, hit_quality,
                  ship, corp_tag, og_line, cleaned_line,
                  create_ts, update_ts, retired
              ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
              """
        rows = []

        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_cap_warnings])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_cap_reps])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_dmg])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_drones])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_jams])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_links])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_neuts])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_nos])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_reloads])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_reps])
        rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_scrams])

        values = [
            (
                r.get("id"),
                r.get("match_id"),
                r.get("pilot"),
                r.get("action_timestamp"),
                r.get("amount"),
                r.get("action_to"),
                r.get("action_from"),
                r.get("direction"),
                r.get("module"),
                r.get("log_type_id"),
                r.get("hit_quality"),
                r.get("ship"),
                r.get("corp_tag", ""),
                r.get("og_line"),
                r.get("cleaned_line"),
                r.get("create_ts", now),
                r.get("update_ts", now),
                int(r.get("retired", False))
            )
            for r in rows if r
        ]

        self.cursor.executemany(sql, values)
        self.conn.commit()

    # -------------------------
    # Utility
    # -------------------------
    def convert_ms_to_timestamp(self, value):
        if isinstance(value, datetime):
            return value.isoformat()

        if isinstance(value, (int, float)):
            return datetime.fromtimestamp(value / 1000, tz=timezone.utc).isoformat()

        return None

    def process_json_record(self, record):
        """
        Normalizes a parsed combat log record for database insertion.

        Responsibilities:
        - Assigns log type ID
        - Generates deterministic unique ID (SHA256 hash)
        - Normalizes "from" and "to" fields
        - Converts timestamps to UTC ISO format
        - Adds audit fields (create_ts, update_ts)
        - Cleans unused keys

        Args:
            record (dict): Raw parsed log entry.

        Returns:
            dict: Normalized database-ready record.
        """
        record['log_type_id'] = self.get_log_type_id(this_type=record['row_type'])

        raw = f"{record['time'].astimezone(timezone.utc).isoformat()}:{record['pilot']}:{record['direction']}:{record['log_type_id']}:{record['cleaned_line']}".encode()
        record["id"] = hashlib.sha256(raw).hexdigest()

        if "to" in record:
            record['action_to'] = record['to'] if record['to'] else "unknown"
        else:
            record['action_to'] = 'unknown'

        if "from" in record:
            record['action_from'] = record['from'] if record['from'] else "unknown"
        else:
            record['action_from'] = "unknown"

        if "time" in record:
            record["action_timestamp"] = self.convert_ms_to_timestamp(record["time"])

        record["create_ts"] = datetime.now(timezone.utc).isoformat()
        record["update_ts"] = datetime.now(timezone.utc).isoformat()
        record['retired'] = False

        if "module" in record and record["module"] is not None:
            if not isinstance(record["module"], str):
                record["module"] = str(record["module"])

        for key in keys_to_remove:
            record.pop(key, None)

        return record

    def get_log_type_id(self, this_type):
        """
        Resolves a log type string to its corresponding database ID.

        Args:
            this_type (str): Log type name (e.g., 'dmg', 'reps').

        Returns:
            int: Log type ID, or 0 if not found.
        """
        if this_type == "dmg":
            this_type = "damage"

        for t in log_types:
            if t.get("name") == this_type:
                return t.get("id")
        return 0

    def _build_schema(self):
        """
        Creates database tables defined in schema files.

        Executes all SQL schema creation statements sequentially.
        """
        for sql in self.create_tables:
            for statement in sql.split(';'):
                self.cursor.execute(statement)
        self.conn.commit()


    def _download_sde_zip(self):
        """
        Downloads and extracts the EVE Static Data Export (SDE).

        Process:
        - Removes existing SDE zip and extracted folder (if present)
        - Downloads fresh SDE archive
        - Extracts contents into local directory

        Raises:
            MissingSDEError: If extraction fails or path is invalid.
        """
        zip_path = f"{self.base}/sde.zip"
        unzip_path = f"{self.base}/sde"

        try:
            if os.path.isfile(zip_path):
                os.unlink(zip_path)

            if os.path.isdir(unzip_path):
                shutil.rmtree(unzip_path)

        except Exception as e:
            print(f"Cleanup error: {e}")

        wget.download(sde_url, str(zip_path))
        with zipfile.ZipFile(zip_path, 'r') as z:
            z.extractall(unzip_path)

        self.sde_location = unzip_path

    def __load_eve_data(self):
        """
        Entry point for loading EVE SDE data into the database.
        """
        self._load_sde_data()

    def _load_sde_data(self):
        """
        Loads and processes EVE SDE data into database tables.

        Includes:
        - invtypes
        - invcategories
        - invgroups
        """
        self._download_sde_zip()
        if self.sde_location is None:
            raise MissingSDEError(location=self.base)

        self._load_invtypes()
        self._load_invcategories()
        self._load_invgroups()

    def _load_invtypes(self):
        """
        Loads invtypes data from SDE YAML into memory.

        Extracts:
        - type IDs
        - group IDs
        - race and meta data
        - market grouping

        Then triggers database insertion.
        """
        with open(f'{self.sde_location}/types.yaml', 'rb') as f:
            types_data = yaml.load(f, Loader=yaml.CSafeLoader)

            now = datetime.now(timezone.utc).isoformat()
            inv_list = []
            for key, value in types_data.items():
                type_id = key
                mass = value.get("mass", 0)
                group_id = value.get('groupID', 0)
                type_name = value['name'].get('en')
                race_id = value.get('raceID', 0)
                meta_group_id = value.get("metaGroupID", 0)
                market_group_id = value.get("marketGroupID", 0)
                inv_list.append({
                    'group_id': group_id,
                    'type_id': type_id,
                    'race_id': race_id,
                    'mass': mass,
                    'meta_group_id': meta_group_id,
                    'market_group_id': market_group_id,
                    'type_name': type_name,
                    'create_ts': now,
                    'update_ts': now,
                    'retired': False
                })

        self.inv_values = inv_list
        self._insert_invtypes()

    def _insert_invtypes(self):
        """
        Inserts processed invtypes data into the database.

        Each row represents a ship/item type definition from SDE.
        """
        now = datetime.now(timezone.utc).isoformat()
        sql = """
              INSERT OR IGNORE INTO invtypes (
                  typeId, raceId, metaGroupId,
                  marketGroupId, typeName, 
                  mass, groupId,
                  create_ts, update_ts, retired
              ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
              """

        values = [
            (
                r.get("type_id"),
                r.get('race_id', 0),
                r.get("meta_group_id", 0),
                r.get("market_group_id", 0),
                r.get("type_name", ""),
                r.get("mass", 0),
                r.get("group_id"),
                r.get("create_ts", now),
                r.get("update_ts", now),
                int(r.get("retired", False))
            )
            for r in self.inv_values if r
        ]

        #self.cursor.executemany(sql, values)
        for v in values:
            try:
                self.cursor.execute(sql, v)
            except Exception as e:
                print("FAILED ROW:", v)
                print("ERROR:", e)
            self.conn.commit()

    def _load_invcategories(self):
        """
        Loads inventory categories from SDE YAML file.

        Builds structured category records for database insertion.
        """

        with open(f'{self.sde_location}/categories.yaml', 'rb') as f:
            cat_data = yaml.load(f, Loader=yaml.CSafeLoader)
            category_list = []
            now = datetime.now(timezone.utc).isoformat()
            for key, value in cat_data.items():
                category_id = key
                name_en = value['name']['en']
                category_list.append({
                    'category_id': category_id,
                    'name': name_en,
                    'create_ts': now,
                    'update_ts': now,
                    'retired': False
                })
        self.category_values = category_list
        self._insert_invcategories()

    def _insert_invcategories(self):
        """
        Inserts inventory category records into the database.
        """
        now = datetime.now(timezone.utc).isoformat()
        sql = """
              INSERT OR IGNORE INTO invcategories (
                  categoryId, name,
                  create_ts, update_ts, retired
              ) VALUES (?, ?, ?, ?, ?)
              """

        values = [
            (
                r.get("category_id", 0),
                r.get('name', ""),
                r.get("create_ts", now),
                r.get("update_ts", now),
                int(r.get("retired", False))
            )
            for r in self.category_values if r
        ]

        self.cursor.executemany(sql, values)
        self.conn.commit()

    def _load_invgroups(self):
        """
        Loads inventory groups from SDE YAML file.

        Groups are linked to categories and represent item classifications.
        """
        with open(f'{self.sde_location}/groups.yaml', 'rb') as f:
            groups_data = yaml.load(f, Loader=yaml.CSafeLoader)
            now = datetime.now(timezone.utc).isoformat()
            group_list = []
            for key, value in groups_data.items():
                category_id = value.get('categoryID', 0)
                name_en = value['name'].get('en')
                group_id = key
                group_list.append({
                    'group_id': group_id,
                    'category_id': category_id,
                    'name': name_en,
                    'create_ts': now,
                    'update_ts': now,
                    'retired': False
                })
        self.group_values = group_list
        self._insert_invgroups()

    def _insert_invgroups(self):
        """
        Inserts inventory group records into the database.
        """
        now = datetime.now(timezone.utc).isoformat()
        sql = """
              INSERT OR IGNORE INTO invgroups (
                  groupId, name, categoryId,
                  create_ts, update_ts, retired
              ) VALUES (?, ?, ?, ?, ?, ?)
              """

        values = [
            (
                r.get("group_id", 0),
                r.get('name', ""),
                r.get("category_id", 0),
                r.get("create_ts", now),
                r.get("update_ts", now),
                int(r.get("retired", False))
            )
            for r in self.group_values if r
        ]

        self.cursor.executemany(sql, values)
        self.conn.commit()

    def execute(self, sql, params=None):
        """
        Executes a raw SQL query and returns results.

        Args:
            sql (str): SQL query string.
            params (tuple | list | None): Optional query parameters.

        Returns:
            list[sqlite3.Row]: Query results.
        """
        return self.cursor.execute(sql, params).fetchall()

    def close(self):
        """
        Closes the SQLite database connection.
        """
        self.conn.close()

__init__(ea, db_path=None)

Initializes the database connection and builds schema.

If no database path is provided, a default local directory (~/.eveanalytics) is created and used.

Workflow: - Creates SQLite connection - Enables foreign key support - Builds schema tables - Prepares cursor for queries

Parameters:

Name Type Description Default
ea

EVE Analytics application instance containing parsed logs.

required
db_path Path | None

Optional custom database file path.

None
Source code in src/eve_analytics/classes/db.py
def __init__(self, ea, db_path=None):
    """
    Initializes the database connection and builds schema.

    If no database path is provided, a default local directory
    (~/.eveanalytics) is created and used.

    Workflow:
    - Creates SQLite connection
    - Enables foreign key support
    - Builds schema tables
    - Prepares cursor for queries

    Args:
        ea: EVE Analytics application instance containing parsed logs.
        db_path (Path | None): Optional custom database file path.
    """
    self.ea = ea
    self.sde_location = None
    if db_path is None:
        self.base = Path.home() / ".eveanalytics"
        self.base.mkdir(exist_ok=True)
        self.db_path = self.base / "combat_logs.db"
    else:
        self.db_path = db_path

    self.conn = sqlite3.connect(self.db_path)
    self.conn.execute("PRAGMA foreign_keys = ON;")
    self.conn.row_factory = sqlite3.Row
    self.cursor = self.conn.cursor()

    self.create_tables = [
        create_combat_data_sql,
        create_combat_log_types_sql,
        create_matches_sql,
        create_pilot_bridge_sql,
        create_users_sql,
        create_invgroups_sql,
        create_dmgtypeattribs_sql,
        create_invcategories_sql,
        create_invtypes_sql
    ]
    self._build_schema()

__insert_combat_logs()

Inserts all parsed combat log events into the combat_data table.

This method: - Flattens all event categories (damage, reps, neuts, etc.) - Normalizes each record via process_json_record() - Assigns match IDs and log type IDs - Bulk inserts into SQLite

Source code in src/eve_analytics/classes/db.py
def __insert_combat_logs(self):
    """
    Inserts all parsed combat log events into the combat_data table.

    This method:
    - Flattens all event categories (damage, reps, neuts, etc.)
    - Normalizes each record via process_json_record()
    - Assigns match IDs and log type IDs
    - Bulk inserts into SQLite
    """
    now = datetime.now(timezone.utc).isoformat()
    sql = """
          INSERT OR IGNORE INTO combat_data (
              id, match_id, pilot, action_timestamp,
              amount, action_to, action_from, direction,
              module, log_type_id, hit_quality,
              ship, corp_tag, og_line, cleaned_line,
              create_ts, update_ts, retired
          ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
          """
    rows = []

    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_cap_warnings])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_cap_reps])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_dmg])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_drones])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_jams])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_links])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_neuts])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_nos])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_reloads])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_reps])
    rows.extend([self.process_json_record(log) for log in self.ea.parsed_logs.all_scrams])

    values = [
        (
            r.get("id"),
            r.get("match_id"),
            r.get("pilot"),
            r.get("action_timestamp"),
            r.get("amount"),
            r.get("action_to"),
            r.get("action_from"),
            r.get("direction"),
            r.get("module"),
            r.get("log_type_id"),
            r.get("hit_quality"),
            r.get("ship"),
            r.get("corp_tag", ""),
            r.get("og_line"),
            r.get("cleaned_line"),
            r.get("create_ts", now),
            r.get("update_ts", now),
            int(r.get("retired", False))
        )
        for r in rows if r
    ]

    self.cursor.executemany(sql, values)
    self.conn.commit()

__insert_matches()

Inserts parsed match metadata into the database.

Each match includes: - Match ID (hashed) - Start/end timestamps - Countdown start timestamp - Description and metadata flags

Source code in src/eve_analytics/classes/db.py
def __insert_matches(self):
    """
    Inserts parsed match metadata into the database.

    Each match includes:
    - Match ID (hashed)
    - Start/end timestamps
    - Countdown start timestamp
    - Description and metadata flags
    """
    now = datetime.now(timezone.utc).isoformat()
    sql = """
        INSERT OR IGNORE INTO matches (
            id, description, countdown_start_ts, 
            match_start_ts, match_end_ts,
            create_ts, update_ts, retired
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """
    values = [
        (
            r.get("id", 0),
            r.get('description', ""),
            r.get('countdown'),
            r.get('start'),
            r.get('end'),
            r.get("create_ts", now),
            r.get("update_ts", now),
            int(r.get("retired", False))
        )
        for r in self.ea.parsed_logs.matches if r
    ]

    self.cursor.executemany(sql, values)
    self.conn.commit()

__load_eve_data()

Entry point for loading EVE SDE data into the database.

Source code in src/eve_analytics/classes/db.py
def __load_eve_data(self):
    """
    Entry point for loading EVE SDE data into the database.
    """
    self._load_sde_data()

__repr__()

Returns a string representation of the database instance.

Returns:

Name Type Description
str str

Database path summary.

Source code in src/eve_analytics/classes/db.py
def __repr__(self) -> str:
    """
    Returns a string representation of the database instance.

    Returns:
        str: Database path summary.
    """

    return f"<Database> | {self.db_path}"

close()

Closes the SQLite database connection.

Source code in src/eve_analytics/classes/db.py
def close(self):
    """
    Closes the SQLite database connection.
    """
    self.conn.close()

default_db_load()

Executes the full default ingestion pipeline.

Steps: - Inserts match metadata - Inserts parsed combat logs - Loads EVE SDE data into database tables

Source code in src/eve_analytics/classes/db.py
def default_db_load(self):
    """
    Executes the full default ingestion pipeline.

    Steps:
    - Inserts match metadata
    - Inserts parsed combat logs
    - Loads EVE SDE data into database tables
    """
    self.__insert_matches()
    self.__insert_combat_logs()
    self.__load_eve_data()

execute(sql, params=None)

Executes a raw SQL query and returns results.

Parameters:

Name Type Description Default
sql str

SQL query string.

required
params tuple | list | None

Optional query parameters.

None

Returns:

Type Description

list[sqlite3.Row]: Query results.

Source code in src/eve_analytics/classes/db.py
def execute(self, sql, params=None):
    """
    Executes a raw SQL query and returns results.

    Args:
        sql (str): SQL query string.
        params (tuple | list | None): Optional query parameters.

    Returns:
        list[sqlite3.Row]: Query results.
    """
    return self.cursor.execute(sql, params).fetchall()

get_log_type_id(this_type)

Resolves a log type string to its corresponding database ID.

Parameters:

Name Type Description Default
this_type str

Log type name (e.g., 'dmg', 'reps').

required

Returns:

Name Type Description
int

Log type ID, or 0 if not found.

Source code in src/eve_analytics/classes/db.py
def get_log_type_id(self, this_type):
    """
    Resolves a log type string to its corresponding database ID.

    Args:
        this_type (str): Log type name (e.g., 'dmg', 'reps').

    Returns:
        int: Log type ID, or 0 if not found.
    """
    if this_type == "dmg":
        this_type = "damage"

    for t in log_types:
        if t.get("name") == this_type:
            return t.get("id")
    return 0

process_json_record(record)

Normalizes a parsed combat log record for database insertion.

Responsibilities: - Assigns log type ID - Generates deterministic unique ID (SHA256 hash) - Normalizes "from" and "to" fields - Converts timestamps to UTC ISO format - Adds audit fields (create_ts, update_ts) - Cleans unused keys

Parameters:

Name Type Description Default
record dict

Raw parsed log entry.

required

Returns:

Name Type Description
dict

Normalized database-ready record.

Source code in src/eve_analytics/classes/db.py
def process_json_record(self, record):
    """
    Normalizes a parsed combat log record for database insertion.

    Responsibilities:
    - Assigns log type ID
    - Generates deterministic unique ID (SHA256 hash)
    - Normalizes "from" and "to" fields
    - Converts timestamps to UTC ISO format
    - Adds audit fields (create_ts, update_ts)
    - Cleans unused keys

    Args:
        record (dict): Raw parsed log entry.

    Returns:
        dict: Normalized database-ready record.
    """
    record['log_type_id'] = self.get_log_type_id(this_type=record['row_type'])

    raw = f"{record['time'].astimezone(timezone.utc).isoformat()}:{record['pilot']}:{record['direction']}:{record['log_type_id']}:{record['cleaned_line']}".encode()
    record["id"] = hashlib.sha256(raw).hexdigest()

    if "to" in record:
        record['action_to'] = record['to'] if record['to'] else "unknown"
    else:
        record['action_to'] = 'unknown'

    if "from" in record:
        record['action_from'] = record['from'] if record['from'] else "unknown"
    else:
        record['action_from'] = "unknown"

    if "time" in record:
        record["action_timestamp"] = self.convert_ms_to_timestamp(record["time"])

    record["create_ts"] = datetime.now(timezone.utc).isoformat()
    record["update_ts"] = datetime.now(timezone.utc).isoformat()
    record['retired'] = False

    if "module" in record and record["module"] is not None:
        if not isinstance(record["module"], str):
            record["module"] = str(record["module"])

    for key in keys_to_remove:
        record.pop(key, None)

    return record