Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bun project that process all kafka message sent by debezium #11151

Open
Dave93 opened this issue May 17, 2024 · 0 comments
Open

Bun project that process all kafka message sent by debezium #11151

Dave93 opened this issue May 17, 2024 · 0 comments
Labels
crash An issue that could cause a crash

Comments

@Dave93
Copy link

Dave93 commented May 17, 2024

How can we reproduce the crash?

Debezium streams postgres database to kafka and if there are a lot of not processed messages bun processes some of them and crashes. If you need backup of postgres database I can give that

JavaScript/TypeScript code that reproduces the crash?

import { Database } from "duckdb-async";
const db = await Database.create(process.env.DUCK_PATH!);
const kafka = new Kafka({
    clientId: "arryy-duckdb",
    brokers: ["127.0.0.1:9092"],
  });

  const consumer = kafka.consumer({
    groupId: process.env.KAFKA_GROUP_ID || "test-group",

  });

  await consumer.connect();
  await consumer.subscribe({
    topics: [/arryt\.public.*/]
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const object = JSON.parse(message.value?.toString() || "{}");
      // console.log("debezium event", object);
      if ('payload' in object) {
        const {
          payload: { before, after, source, op, ts_ms, source: { table } },
        } = object;
        console.log('table ', table)
        try {
          if (!before) {
            const existingRecord = await db.all(`SELECT id FROM ${table} WHERE id = '${after.id}'`);
            if (existingRecord.length === 0) {
              console.log(`
          INSERT INTO ${table} (${Object.keys(after).join(", ")}) VALUES (${Object.values(after).map((val) => (typeof val === "string" ? `'${val.replace(/'/g, "''")}'` : val)).join(", ")})
          `)
              db.all(`
          INSERT INTO ${table} (${Object.keys(after).join(", ")}) VALUES (${Object.values(after).map((val) => (typeof val === "string" ? `'${val.replace(/'/g, "''")}'` : val)).join(", ")})
          `)
            } else {
              console.log(`
          UPDATE ${table} SET ${Object.keys(after).map((key) => `${key} = ${typeof after[key] === "string" ? `'${after[key].replace(/'/g, "''")}'` : after[key]}`).join(", ")} WHERE id = '${after.id}'
          `)
              db.all(`
          UPDATE ${table} SET ${Object.keys(after).map((key) => `${key} = ${typeof after[key] === "string" ? `'${after[key].replace(/'/g, "''")}'` : after[key]}`).join(", ")} WHERE id = '${after.id}'
          `)
            }

          } else if (!after) {
            console.log(`
        DELETE FROM ${table} WHERE id = '${before.id}'
        `)
            db.all(`
        DELETE FROM ${table} WHERE id = '${before.id}'
        `)
          }

        } catch (e) {
          console.log('error', e)
        }
      }

    },
  });

Relevant log output

UPDATE order_actions SET id = 'd49099f9-8cf2-41fc-80db-dd3de8277d0d', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 3, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "Доставлен"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:43.32600Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'd49099f9-8cf2-41fc-80db-dd3de8277d0d'
          
table  order_actions

          UPDATE order_actions SET id = 'b80ac290-e5ec-44f8-8955-94ee6ba8cebf', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 587, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "В пути"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:33.48100Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'b80ac290-e5ec-44f8-8955-94ee6ba8cebf'
          
table  order_actions

          UPDATE order_actions SET id = 'abbffe4c-9707-41ef-909c-c0fd616f3952', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 6, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "Ожидает гостя"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T07:08:39.61400Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = 'abbffe4c-9707-41ef-909c-c0fd616f3952'
          
table  order_actions

          UPDATE order_actions SET id = '63fe42fa-9095-4b39-b089-9fd36bf51ecc', order_id = 'c047ed39-094d-4ed8-80a9-b93733a38700', order_created_at = '2023-11-10T06:45:04.66400Z', duration = 0, action = 'STATUS_CHANGE', action_text = 'Статус заказа изменен на "В филиале"', terminal_id = 'b1a884a3-7096-4a8c-8425-8f824fbbf59c', created_at = '2023-11-10T06:58:46.27400Z', created_by = '51f0c64c-dcef-486e-8a6c-2577af25e23b' WHERE id = '63fe42fa-9095-4b39-b089-9fd36bf51ecc'
          
table  order_actions
============================================================
Bun v1.1.8 (89d25807) Linux x64
Args: "bun", "run", "src/index.ts"
Features: jsc dotenv http_server transpiler_cache(2) tsconfig_paths tsconfig(4) 
Builtins: "bun:main" "detect-libc" "node:assert" "node:buffer" "node:crypto" "node:events" "node:fs" "node:http" "node:net" "node:os" "node:path" "node:stream" "node:string_decoder" "node:tls" "node:url" "node:util" "node:util/types" "node:http2" 
Elapsed: 6735ms | User: 6700ms | Sys: 910ms
RSS: 1.05GB | Peak: 0.20GB | Commit: 1.05GB | Faults: 4

panic(main thread): Segmentation fault at address 0x4E6000080208
oh no: Bun has crashed. This indicates a bug in Bun, not your code.

Stack Trace (bun.report)

Bun v1.1.8 (89d2580) on linux x86_64 [RunCommand]

Segmentation fault at address 0x4E6000080208

@Dave93 Dave93 added the crash An issue that could cause a crash label May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
crash An issue that could cause a crash
Projects
None yet
Development

No branches or pull requests

1 participant