Kafka, Redis, Postgres, Kubernetes를 활용한 실시간 파이프라인 마이크로서비스 프로젝트 방법

Kafka, Redis, Postgres, Kubernetes를 활용한 실시간 파이프라인 마이크로서비스 프로젝트 방법
TILPosted On Jul 9, 202434 min read

소개

이 문서는 데이터를 처리하여 분석을 위해 데이터베이스를 채우는 데 사용되는 실시간 마이크로서비스 프로젝트에 대한 안내서입니다.

이미지

STG-Service

Redis 클라이언트

Redis와 상호 작용하기 위한 간단한 클라이언트입니다. 특정 키로 객체를 가져오거나 새 키-값 쌍을 설정할 수 있습니다:

import json
from typing import Dict
import redis

class RedisClient:
    def __init__(self, host: str, port: int, password: str, cert_path: str) -> None:
        self._client = redis.StrictRedis(
            host=host,
            port=port,
            password=password,
            ssl=True,
            ssl_ca_certs=cert_path)
    def set(self, k, v):
        self._client.set(k, json.dumps(v))
    def get(self, k) -> Dict:
        obj: str = self._client.get(k)
        return json.loads(obj)

Postgres 클라이언트

데이터베이스에 연결하는 것 외에도, 클라이언트는 하나의 컨텍스트 매니저의 일부로 여러 쿼리를 실행할 수 있는 기능을 제공하여 한 트랜잭션의 실행 명령을 커밋할 필요가 없어요 (나중에 사용 예제를 보게 될 거에요):

from contextlib import contextmanager
from typing import Generator
import psycopg2

class PgConnect:
    def __init__(self, host: str, port: int, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
        self.host = host
        self.port = port
        self.db_name = db_name
        self.user = user
        self.pw = pw
        self.sslmode = sslmode
    def url(self) -> str:
        return """
            host={host}
            port={port}
            dbname={db_name}
            user={user}
            password={pw}
            target_session_attrs=read-write
            sslmode={sslmode}
        """.format(
            host=self.host,
            port=self.port,
            db_name=self.db_name,
            user=self.user,
            pw=self.pw,
            sslmode=self.sslmode)
    @contextmanager
    def connection(self) -> Generator:
        keepalive_kwargs = {
            "keepalives": 1,
            "keepalives_idle": 30,
            "keepalives_interval": 5,
            "keepalives_count": 5,
        }
        conn = psycopg2.connect(self.url(), **keepalive_kwargs)
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

카프카에서 메시지를 생성하고 사용하는 두 개의 별도 및 간단한 클라이언트:

import json
from typing import Dict, Optional
from confluent_kafka import Consumer, Producer

def error_callback(err):
    print('Something went wrong: {}'.format(err))

class KafkaProducer:
    def __init__(self, host: str, port: int, user: str, password: str, topic: str, cert_path: str) -> None:
        params = {
            'bootstrap.servers': f'{host}:{port}',
            'security.protocol': 'SASL_SSL',
            'ssl.ca.location': cert_path,
            'sasl.mechanism': 'SCRAM-SHA-512',
            'sasl.username': user,
            'sasl.password': password,
            'error_cb': error_callback,
        }
        self.topic = topic
        self.p = Producer(params)
    def produce(self, payload: Dict) -> None:
        self.p.produce(self.topic, json.dumps(payload))
        self.p.flush(10)

class KafkaConsumer:
    def __init__(self,
                 host: str,
                 port: int,
                 user: str,
                 password: str,
                 topic: str,
                 group: str,
                 cert_path: str
                 ) -> None:
        params = {
            'bootstrap.servers': f'{host}:{port}',
            'security.protocol': 'SASL_SSL',
            'ssl.ca.location': cert_path,
            'sasl.mechanism': 'SCRAM-SHA-512',
            'sasl.username': user,
            'sasl.password': password,
            'group.id': group,  # '',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
            'error_cb': error_callback,
            'debug': 'all',
            'client.id': 'someclientkey'
        }
        self.topic = topic
        self.c = Consumer(params)
        self.c.subscribe([topic])
    def consume(self, timeout: float = 3.0) -> Optional[Dict]:
        msg = self.c.poll(timeout=timeout)
        if not msg:
            return None
        if msg.error():
            raise Exception(msg.error())
        val = msg.value().decode()
        return json.loads(val)

STG Schema

포스트그레스의 스테이징 레이어로, Kafka에서 나온 로우 메시지를 저장할 한 개의 테이블이 있을 것입니다. 이는 아래와 같이 정의되어 있습니다.

CREATE SCHEMA IF NOT EXISTS stg;

CREATE TABLE IF NOT EXISTS stg.order_events (
    id SERIAL   PRIMARY KEY,
    object_id   INTEGER NOT NULL UNIQUE,
    object_type VARCHAR(20) NOT NULL,
    sent_dttm   TIMESTAMP NOT NULL,
    payload     JSON NOT NULL
);

그리고 이를 실행할 파이썬 함수는:

from lib.pg.pg_connect import PgConnect

def make_stg_migrations(db: PgConnect) -> None:
    with db.connection() as conn:
        with conn.cursor() as cur:
            # STG 스키마의 SQL 정의 경로는 다를 수 있습니다.
            cur.execute(open("stg_schema.sql", "r").read())

프로그램이 db.connection()으로 정의된 컨텍스트 매니저를 벗어나면 명시적으로 실행할 필요 없이 자동으로 커밋됩니다.

메시지 처리

먼저 소비된 카프카 메시지를 STG 포스트그레스 테이블(StgRepository)에 삽입한 다음, 레디스에서 레스토랑 데이터를 풍부하게하여 다른 카프카 클러스터(StgMessageProcessor)를 위한 출력 메시지를 구성해야 합니다.

from datetime import datetime

from lib.pg.pg_connect import PgConnect

class StgRepository:
    def __init__(self, db: PgConnect) -> None:
        self._db = db

    def order_events_insert(self,
                            object_id: int,
                            object_type: str,
                            sent_dttm: datetime,
                            payload: str
                            ) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                        INSERT INTO stg.order_events (object_id, object_type, sent_dttm, payload) VALUES (%(object_id)s, %(object_type)s, %(sent_dttm)s, %(payload)s)
                        ON CONFLICT (object_id)
                        DO UPDATE
                        SET object_type = EXCLUDED.object_type,
                            sent_dttm = EXCLUDED.sent_dttm,
                            payload = EXCLUDED.payload;
                    """,
                    {
                        'object_id': object_id,
                        'object_type': object_type,
                        'sent_dttm': sent_dttm,
                        'payload': payload
                    }
                )
import json
from datetime import datetime
from logging import Logger
from lib.kafka_connect.kafka_connectors import KafkaConsumer, KafkaProducer
from lib.redis.redis_client import RedisClient
from stg_loader.repository.stg_repository import StgRepository

class StgMessageProcessor:
    def __init__(self,
                 consumer: KafkaConsumer,
                 producer: KafkaProducer,
                 redis: RedisClient,
                 stg_repository: StgRepository,
                 logger: Logger) -> None:
        self._logger = logger
        self._consumer = consumer
        self._producer = producer
        self._redis = redis
        self._stg_repository = stg_repository
        self._batch_size = 100

    def run(self) -> None:
        self._logger.info(f"{datetime.utcnow()}: START")
        for i in range(self._batch_size):
            msg = self._consumer.consume()
            if not msg:
                continue
            self._stg_repository.order_events_insert(object_id=msg["object_id"],
                                                     object_type=msg["object_type"],
                                                     sent_dttm=msg["sent_dttm"],
                                                     payload=json.dumps(msg["payload"]))
            dst_msg = self._construct_output_message(msg)
            self._producer.produce(dst_msg)
        self._logger.info(f"{datetime.utcnow()}: FINISH")

    def _construct_output_message(self, original_message: dict) -> dict:
        restaurant_id = original_message["payload"]["restaurant"]["id"]
        restaurant_data = self._redis.get(restaurant_id)
        restaurant_name = restaurant_data["name"]
        user_id = original_message["payload"]["user"]["id"]
        user_data = self._redis.get(user_id)
        user_name = user_data["name"]
        user_login = user_data["login"]
        order = original_message["payload"]
        restaurant_menu = {p["_id"]: p for p in restaurant_data["menu"]}
        products = {p["id"]: {**p, "category": restaurant_menu[p["id"]]["category"]} for p in order["order_items"]}
        return {
                "object_id": original_message["object_id"],
                "object_type": original_message["object_type"],
                "payload": {
                    "id": original_message["object_id"],
                    "date": order["date"],
                    "cost": order["cost"],
                    "payment": order["payment"],
                    "status": order["final_status"],
                    "restaurant": {
                        "restaurant_id": restaurant_id,
                        "restaurant_name": restaurant_name
                    },
                    "user": {
                        "user_id": user_id,
                        "user_name": user_name,
                        "user_login": user_login
                    },
                    "products": products
                }
            }

서비스 구성

소스/싱크 카프카, Redis 및 포스트그레스와 연결해야 합니다. 이것은 확실히 많은 구성이 필요하며 환경 변수를 사용해야 하므로 이를 별도의 클래스에서 수용할 것입니다:

import os

from lib.kafka_connect.kafka_connectors import KafkaConsumer, KafkaProducer
from lib.redis.redis_client import RedisClient
from lib.pg.pg_connect import PgConnect
from stg_loader.repository.stg_repository import StgRepository

class AppConfig:
    CERTIFICATE_PATH = '/crt/YandexInternalRootCA.crt'
    DEFAULT_JOB_INTERVAL = 25
    def __init__(self) -> None:
        self.kafka_host = str(os.getenv('KAFKA_HOST') or "")
        self.kafka_port = int(str(os.getenv('KAFKA_PORT')) or 0)
        self.kafka_consumer_username = str(os.getenv('KAFKA_CONSUMER_USERNAME') or "")
        self.kafka_consumer_password = str(os.getenv('KAFKA_CONSUMER_PASSWORD') or "")
        self.kafka_consumer_group = str(os.getenv('KAFKA_CONSUMER_GROUP') or "")
        self.kafka_consumer_topic = str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
        self.kafka_producer_username = str(os.getenv('KAFKA_PRODUCER_USERNAME') or "")
        self.kafka_producer_password = str(os.getenv('KAFKA_PRODUCER_PASSWORD') or "")
        self.kafka_producer_topic = str(os.getenv('KAFKA_DESTINATION_TOPIC') or "")
        self.redis_host = str(os.getenv('REDIS_HOST') or "")
        self.redis_port = int(str(os.getenv('REDIS_PORT')) or 0)
        self.redis_password = str(os.getenv('REDIS_PASSWORD') or "")
        self.pg_host = str(os.getenv('PG_HOST') or "")
        self.pg_port = int(str(os.getenv('PG_PORT')) or 6432)
        self.pg_db_name = str(os.getenv("PG_DB_NAME"))
        self.pg_user = str(os.getenv('PG_USER') or "")
        self.pg_password = str(os.getenv('PG_PASSWORD') or "")
    def kafka_producer(self):
        return KafkaProducer(
            self.kafka_host,
            self.kafka_port,
            self.kafka_producer_username,
            self.kafka_producer_password,
            self.kafka_producer_topic,
            self.CERTIFICATE_PATH
        )
    def kafka_consumer(self):
        return KafkaConsumer(
            self.kafka_host,
            self.kafka_port,
            self.kafka_consumer_username,
            self.kafka_consumer_password,
            self.kafka_consumer_topic,
            self.kafka_consumer_group,
            self.CERTIFICATE_PATH
        )
    def redis_client(self) -> RedisClient:
        return RedisClient(
            self.redis_host,
            self.redis_port,
            self.redis_password,
            self.CERTIFICATE_PATH
        )
    def stg_loader(self) -> StgRepository:
        db: PgConnect = PgConnect(
            self.pg_host,
            self.pg_port,
            self.pg_db_name,
            self.pg_user,
            self.pg_password
        )
        return StgRepository(db)
    def pg_client(self) -> PgConnect:
        return PgConnect(
            self.pg_host,
            self.pg_port,
            self.pg_db_name,
            self.pg_user,
            self.pg_password
        )

STG Service 실행

StgMessageProcessor를 백그라운드 프로세스로 실행해야 합니다 (apscheduler 파이썬 모듈의 BackgroundScheduler를 사용할 것입니다) 그리고 건강 상태를 확인하는 간단한 API를 추가할 것입니다.

import os
import logging

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from stg_loader.stg_message_processor_job import StgMessageProcessor
from app_config import AppConfig
from stg_migrations import make_stg_migrations

app = Flask(__name__)

# 서비스가 정상인지 확인할 수 있는 엔드포인트 생성
@app.get('/health')
def health():
    return 'healthy'

if __name__ == '__main__':
    app.logger.setLevel(logging.DEBUG)
    config = AppConfig()
    make_stg_migrations(config.pg_client())
    proc = StgMessageProcessor(logger=app.logger,
                               consumer=config.kafka_consumer(),
                               producer=config.kafka_producer(),
                               redis=config.redis_client(),
                               stg_repository=config.stg_loader())
    scheduler = BackgroundScheduler()
    scheduler.add_job(func=proc.run, trigger="interval", seconds=config.DEFAULT_JOB_INTERVAL)
    scheduler.start()
    app.run(debug=True, host='0.0.0.0', use_reloader=False)

Dockerfile

스테이징 서비스는 코어크레이트 기반의 쿠버네티스 클러스터에서 실행될 예정이므로, 서비스를 도커 이미지로 만들어주어야 합니다.

FROM python:3.10

RUN apt-get update -y
# 컨테이너 내에서 confluent_kafka 파이썬 모듈이 작동되도록 필요합니다
RUN git clone https://github.com/edenhill/librdkafka && cd librdkafka && ./configure && make && make install && ldconfig
COPY . .
RUN pip install -r requirements.txt
# Kafka 클러스터에 안전한 연결을 위한 인증서 다운로드
RUN mkdir -p /crt
RUN wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" --output-document /crt/YandexInternalRootCA.crt
RUN chmod 0600 /crt/YandexInternalRootCA.crt
WORKDIR /src
# 파이썬 임포트가 작동되도록 설정
ENV PYTHONPATH "${PYTHONPATH}:/src"
ENTRYPOINT ["python"]
CMD ["app.py"]

로컬 테스트를 위해 Docker Compose를 사용하여 스테이징 서비스를 실행할 수 있습니다.

version: "3.9"

services:
  stg_service:
    build:
      context: .
      network: host
    image: stg_img:local
    container_name: stg_container
    environment:
      FLASK_APP: ${STG_SERVICE_APP_NAME:-stg_service}
      DEBUG: ${STG_SERVICE_DEBUG:-True}
      KAFKA_HOST: ${KAFKA_HOST}
      KAFKA_PORT: ${KAFKA_PORT}
      KAFKA_CONSUMER_USERNAME: ${KAFKA_CONSUMER_USERNAME}
      KAFKA_CONSUMER_PASSWORD: ${KAFKA_CONSUMER_PASSWORD}
      KAFKA_CONSUMER_GROUP: ${KAFKA_CONSUMER_GROUP}
      KAFKA_SOURCE_TOPIC: ${KAFKA_SOURCE_TOPIC}
      KAFKA_DESTINATION_TOPIC: ${KAFKA_DESTINATION_TOPIC}
      KAFKA_PRODUCER_USERNAME: ${KAFKA_PRODUCER_USERNAME}
      KAFKA_PRODUCER_PASSWORD: ${KAFKA_PRODUCER_PASSWORD}
      REDIS_HOST: ${REDIS_HOST}
      REDIS_PORT: ${REDIS_PORT}
      REDIS_PASSWORD: ${REDIS_PASSWORD}
      PG_HOST: ${PG_HOST}
      PG_PORT: ${PG_PORT}
      PG_DB_NAME: ${PG_DB_NAME}
      PG_USER: ${PG_USER}
      PG_PASSWORD: ${PG_PASSWORD}
    network_mode: "bridge"
    ports:
      - "5101:5000"
    restart: unless-stopped

.env 파일:

KAFKA_HOST=******.mdb.yandexcloud.net
KAFKA_PORT=9091
KAFKA_CONSUMER_USERNAME=producer_consumer
KAFKA_CONSUMER_PASSWORD=******
KAFKA_CONSUMER_GROUP=test-consumer1
KAFKA_SOURCE_TOPIC=dds_input_topic
KAFKA_PRODUCER_USERNAME=producer_consumer
KAFKA_PRODUCER_PASSWORD=*******
KAFKA_DESTINATION_TOPIC=cdm_input_topic

REDIS_HOST=******.mdb.yandexcloud.net
REDIS_PORT=6380
REDIS_PASSWORD=******
PG_HOST=********.mdb.yandexcloud.net
PG_PORT=6432
PG_DB_NAME=sprint9dwh
PG_USER=yandex_pg
PG_PASSWORD=**********

HELM 차트

Chart.yaml 파일:

apiVersion: v2
name: first-service
description: 쿠버네티스용 헬름 차트

# 차트는 'application' 또는 'library' 차트 중 하나일 수 있습니다.
#
# Application 차트는 템플릿 모음이며 버전이 지정된 아카이브로 패키지화하여 배포될 수 있습니다.
#
# Library 차트는 차트 개발자를 위한 유용한 유틸리티 또는 함수를 제공합니다. Application 차트의 종속성으로 포함되어
# 렌더링 파이프라인에 이러한 유틸리티와 함수를 삽입합니다. Library 차트는 템플릿을 정의하지 않으며 따라서 배포될 수 없습니다.
type: application
# 이것은 차트 버전입니다. 이 번호는 차트 및 해당 템플릿에 변경이 있을 때마다 증가해야 합니다.
# 버전은 Semantic Versioning (https://semver.org/)을 따르는 것으로 예상됩니다.
version: 0.1.0
# 이것은 배포되는 애플리케이션의 버전 번호입니다. 이 버전 번호는 애플리케이션에 변경이 있는 경우마다 증가해야 합니다.
# 버전은 Semantic Versioning을 따르지 않습니다. 애플리케이션이 사용 중인 버전을 반영해야 합니다.
# 따옴표와 함께 사용하는 것이 권장됩니다.
appVersion: "1.16.0"

values.yaml 파일:

# 앱의 기본 값.
# 이것은 YAML 형식의 파일입니다.
# 템플릿에 전달할 변수를 선언합니다.

replicaCount: 3
image:
  # 컨테이너 레지스트리에 대한 링크. 야н덱스 클라우드에서 실행할 것입니다.
  repository: cr.yandex/crpr6naar69761ehm0bp/stg_service
  pullPolicy: IfNotPresent
  # 기본적으로 차트 appVersion인 이미지 태그를 덮어씁니다.
  tag: "v2022-12-13-r1"
containerPort: 5000
config:
  KAFKA_HOST: rc1a-hins1kp5qsfnsob3.mdb.yandexcloud.net
  KAFKA_PORT: "9091"
  KAFKA_CONSUMER_USERNAME: producer_consumer
  KAFKA_CONSUMER_PASSWORD: "*****"
  KAFKA_CONSUMER_GROUP: test-consumer1
  KAFKA_SOURCE_TOPIC: order-service_orders
  KAFKA_PRODUCER_USERNAME: producer_consumer
  KAFKA_PRODUCER_PASSWORD: "*****"
  KAFKA_DESTINATION_TOPIC: dds_topic_name
  REDIS_HOST: c-c9qeltiiu2rkcr6v9net.rw.mdb.yandexcloud.net
  REDIS_PORT: "6380"
  REDIS_PASSWORD: "*****"
  PG_HOST: rc1b-4olk4uzgdrdte114.mdb.yandexcloud.net
  PG_PORT: "6432"
  PG_DB_NAME: sprint9dwh
  PG_USER: yandex_pg
  PG_PASSWORD: "*****"
imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
podAnnotations: {}
resources:
  # 일반적으로 기본 리소스를 지정하지 않고 사용자의 명시적인 선택으로 유지하는 것을 권장합니다.
  # 이것은 Minikube와 같은 리소스가 적은 환경에서 차트 실행 기회를 높이기도 합니다.
  # 리소스를 지정하려면 아래 줄 주석 처리를 해제하고 필요에 맞게 조정한 다음 'resources:' 뒤의 중괄호를 제거하세요.
  limits:
    cpu: 100m
    memory: 128Mi
  requests:
    cpu: 100m
    memory: 128Mi

templates/configmap.yaml은 우리 서비스의 구성을 저장하는 k8s 엔터티입니다. values.yaml 파일의 config 블록에서 모든 키-값 쌍을 가져올 것입니다:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ include "app.fullname" . }}-config
  labels:
    {{- include "app.labels" . | nindent 4 }}
{{- with .Values.config }}
data:
  {{- toYaml . | nindent 2 }}
{{- end }}

templates/deployment.yaml 파일

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "app.fullname" . }}
  labels:
    {{- include "app.labels" . | nindent 4 }}
spec:
  replicas: {{ .Values.replicaCount }}
  selector:
    matchLabels:
      {{- include "app.selectorLabels" . | nindent 6 }}
  template:
    metadata:
      {{- with .Values.podAnnotations }}
      annotations:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      labels:
        {{- include "app.selectorLabels" . | nindent 8 }}
    spec:
      {{- with .Values.imagePullSecrets }}
      imagePullSecrets:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      containers:
        - name: {{ .Chart.Name }}
          securityContext:
            {{- toYaml .Values.securityContext | nindent 12 }}
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          envFrom:
            - configMapRef:
                name: {{ include "app.fullname" . }}-config
          ports:
            - name: http
              containerPort: {{ .Values.containerPort }}
              protocol: TCP
          resources:
            {{- toYaml .Values.resources | nindent 12 }}

<!-- TIL 수평 -->
<ins class="adsbygoogle"
     style="display:block"
     data-ad-client="ca-pub-4877378276818686"
     data-ad-slot="1549334788"
     data-ad-format="auto"
     data-full-width-responsive="true"></ins>
<script>
(adsbygoogle = window.adsbygoogle || []).push({});
</script>

STG-Service의 소스 코드는 여기에서 찾을  있어요.

# DDS-Service

STG-Service 이후의 모든 것은 실제로 매우 쉬워집니다. 다른 서비스들도 거의 동일한 구조를 사용하기 때문이죠. DDS-Service의 경우, 모든 클라이언트 정의가 동일합니다. Dockerfile, Docker Compose  HELM Chart는 거의 동일합니다. 여기에서 소스 코드를 확인할  있어요.

가장  차이점은 데이터 모델링 방식에 있습니다 (Data Vault 2.0 사용합니다):

<!-- TIL 수평 -->
<ins class="adsbygoogle"
     style="display:block"
     data-ad-client="ca-pub-4877378276818686"
     data-ad-slot="1549334788"
     data-ad-format="auto"
     data-full-width-responsive="true"></ins>
<script>
(adsbygoogle = window.adsbygoogle || []).push({});
</script>

CREATE SCHEMA IF NOT EXISTS dds;

-- 데이터 보트 --
-- 허브 --
CREATE TABLE IF NOT EXISTS dds.h_user (
    h_user_pk UUID PRIMARY KEY,
    user_id   VARCHAR NOT NULL UNIQUE,
    load_dt   TIMESTAMP NOT NULL,
    load_src  VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.h_product (
    h_product_pk UUID PRIMARY KEY,
    product_id   VARCHAR NOT NULL UNIQUE,
    load_dt      TIMESTAMP NOT NULL,
    load_src     VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.h_category (
    h_category_pk UUID PRIMARY KEY,
    category_name VARCHAR NOT NULL UNIQUE,
    load_dt       TIMESTAMP NOT NULL,
    load_src      VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.h_restaurant (
    h_restaurant_pk UUID PRIMARY KEY,
    restaurant_id   VARCHAR NOT NULL UNIQUE,
    load_dt         TIMESTAMP NOT NULL,
    load_src        VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.h_order (
    h_order_pk UUID PRIMARY KEY,
    order_id   INTEGER NOT NULL UNIQUE,
    order_dt   TIMESTAMP NOT NULL,
    load_dt    TIMESTAMP NOT NULL,
    load_src   VARCHAR NOT NULL
);
-- 훗들 --
CREATE TABLE IF NOT EXISTS dds.s_user_names (
    hk_user_names_pk UUID PRIMARY KEY,
    h_user_pk        UUID NOT NULL UNIQUE REFERENCES dds.h_user (h_user_pk),
    username         VARCHAR NOT NULL,
    userlogin        VARCHAR NOT NULL,
    load_src         VARCHAR NOT NULL,
    load_dt          TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.s_product_names (
    hk_product_names_pk UUID PRIMARY KEY,
    h_product_pk        UUID NOT NULL UNIQUE REFERENCES dds.h_product (h_product_pk),
    name                VARCHAR NOT NULL,
    load_src            VARCHAR NOT NULL,
    load_dt             TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.s_restaurant_names (
    hk_restaurant_names_pk UUID PRIMARY KEY,
    h_restaurant_pk        UUID NOT NULL UNIQUE REFERENCES dds.h_restaurant (h_restaurant_pk),
    name                   VARCHAR NOT NULL,
    load_src               VARCHAR NOT NULL,
    load_dt                TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.s_order_cost (
    hk_order_cost_pk UUID PRIMARY KEY,
    h_order_pk       UUID NOT NULL UNIQUE REFERENCES dds.h_order (h_order_pk),
    cost             DECIMAL(19, 5) NOT NULL,
    payment          DECIMAL(19, 5) NOT NULL,
    load_src         VARCHAR NOT NULL,
    load_dt          TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.s_order_status (
    hk_order_status_pk UUID PRIMARY KEY,
    h_order_pk         UUID NOT NULL UNIQUE REFERENCES dds.h_order (h_order_pk),
    status             VARCHAR NOT NULL,
    load_src           VARCHAR NOT NULL,
    load_dt            TIMESTAMP NOT NULL
);
-- 링크 --
CREATE TABLE IF NOT EXISTS dds.l_order_product (
    hk_order_product_pk UUID PRIMARY KEY,
    h_order_pk          UUID NOT NULL REFERENCES dds.h_order (h_order_pk),
    h_product_pk        UUID NOT NULL REFERENCES dds.h_product (h_product_pk),
    load_src            VARCHAR NOT NULL,
    load_dt             TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.l_product_restaurant (
    hk_product_restaurant_pk UUID PRIMARY KEY,
    h_restaurant_pk          UUID NOT NULL REFERENCES dds.h_restaurant (h_restaurant_pk),
    h_product_pk             UUID NOT NULL REFERENCES dds.h_product (h_product_pk),
    load_src                 VARCHAR NOT NULL,
    load_dt                  TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.l_product_category (
    hk_product_category_pk UUID PRIMARY KEY,
    h_category_pk          UUID NOT NULL REFERENCES dds.h_category (h_category_pk),
    h_product_pk           UUID NOT NULL REFERENCES dds.h_product (h_product_pk),
    load_src               VARCHAR NOT NULL,
    load_dt                TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS dds.l_order_user (
    hk_order_user_pk UUID PRIMARY KEY,
    h_order_pk       UUID NOT NULL REFERENCES dds.h_order (h_order_pk),
    h_user_pk        UUID NOT NULL REFERENCES dds.h_user (h_user_pk),
    load_src         VARCHAR NOT NULL,
    load_dt          TIMESTAMP NOT NULL
);

우리 DDS-Service의 소스 Kafka 메시지는 STG-Service의 출력 메시지입니다.  다른 차이점은 Postgres에 이러한 메시지를 저장하는 방식에 있습니다. 데이터 보트 모델을 사용하므로 약간 복잡해집니다:

import os
import uuid
from datetime import datetime

from lib.pg.pg_connect import PgConnect
from psycopg2.extras import execute_batch

class DdsRepository:
    def __init__(self, db: PgConnect) -> None:
        self._db = db
        self._batch_size = 20
    def h_user_insert(self, user_id: str) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO dds.h_user (h_user_pk, user_id, load_dt, load_src) VALUES (%(h_user_pk)s, %(user_id)s, %(load_dt)s, %(load_src)s)
                    ON CONFLICT (user_id)
                    DO NOTHING;
                """,
                {
                    "h_user_pk": str(uuid.uuid4()),
                    "user_id": user_id,
                    "load_dt": datetime.now(),
                    "load_src": str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
                })
    def s_user_names_insert(self, user_id: str, username: str, userlogin: str) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"SELECT h_user_pk FROM dds.h_user WHERE user_id = '{user_id}'")
                h_user_pk = cur.fetchone()[0]
                cur.execute("""
                    INSERT INTO dds.s_user_names (hk_user_names_pk, h_user_pk, username, userlogin, load_dt, load_src) VALUES (%(hk_user_names_pk)s, %(h_user_pk)s, %(username)s, %(userlogin)s, %(load_dt)s, %(load_src)s)
                    ON CONFLICT (h_user_pk)
                    DO NOTHING;
                """,
                {
                    "hk_user_names_pk": str(uuid.uuid4()),
                    "h_user_pk": h_user_pk,
                    "username": username,
                    "userlogin": userlogin,
                    "load_dt": datetime.now(),
                    "load_src": str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
                })
    def h_order_insert(self, order_id: str, order_dt: datetime) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO dds.h_order (h_order_pk, order_id, order_dt, load_dt, load_src) VALUES (%(h_order_pk)s, %(order_id)s, %(order_dt)s, %(load_dt)s, %(load_src)s)
                    ON CONFLICT (order_id)
                    DO NOTHING;
                """,
                {
                    "h_order_pk": str(uuid.uuid4()),
                    "order_id": order_id,
                    "order_dt": order_dt,
                    "load_dt": datetime.now(),
                    "load_src": str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
                })
    def s_order_cost_insert(self, order_id: str, cost: float, payment: float) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"SELECT h_order_pk FROM dds.h_order WHERE order_id = '{order_id}'")
                h_order_pk = cur.fetchone()[0]
                cur.execute("""
                    INSERT INTO dds.s_order_cost (hk_order_cost_pk, h_order_pk, cost, payment, load_dt, load_src) VALUES (%(hk_order_cost_pk)s, %(h_order_pk)s, %(cost)s, %(payment)s, %(load_dt)s, %(load_src)s)
                    ON CONFLICT (h_order_pk)
                    DO UPDATE SET cost = EXCLUDED.cost,
                                  payment = EXCLUDED.payment;
                """,
                {
                    "hk_order_cost_pk": str(uuid.uuid4()),
                    "h_order_pk": h_order_pk,
                    "cost": cost,
                    "payment": payment,
                    "load_dt": datetime.now(),
                    "load_src": str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
                })
    def s_order_status_insert(self, order_id: str, status: str) -> None:
        with self._db.connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"SELECT h_order_pk FROM dds.h_order WHERE order_id = '{order_id}'")
                h_order_pk = cur.fetchone()[0]
                cur.execute("""
                    INSERT INTO dds.s_order_status (hk_order_status_pk, h_order_pk, status, load_dt, load_src) VALUES (%(hk_order_status_pk)s, %(h_order_pk)s, %(status)s, %(load_dt)s, %(load_src)s)
                    ON CONFLICT (h_order_pk)
                    DO UPDATE SET status=EXCLUDED.status;
                """,
                {
                    "hk_order_status_pk": str(uuid.uuid4()),
                    "h_order_pk": h_order_pk,
                    "status": status,
                    "load_dt": datetime.now(),
                    "load_src": str(os.getenv('KAFKA_SOURCE_TOPIC') or "")
                })
    def l_order_user_insert(self, user_id: str, order_id:

<!-- TIL 수평 -->
<ins class="adsbygoogle"
     style="display:block"
     data-ad-client="ca-pub-4877378276818686"
     data-ad-slot="1549334788"
     data-ad-format="auto"
     data-full-width-responsive="true"></ins>
<script>
(adsbygoogle = window.adsbygoogle || []).push({});
</script>

우리는 수신 메시지를 HUB, SATELLITE  LINK로 분할합니다 - 이는 우리 데이터 모델의 주요 개체입니다. 또한, 마지막 단계로, 메시지를 출력 Kafka 클러스터로 준비하여 다음 다운스트림 서비스에 전달합니다.

app.py 파일은 기본적으로 동일합니다: 서비스를 백그라운드 작업으로 실행하고 서비스의 상태를 확인하기 위한 간단한 API를 생성합니다.

# CDM-Service

다른 서비스들과 유사하게 작동하며 여기에서 확인할  있습니다.

<!-- TIL 수평 -->
<ins class="adsbygoogle"
     style="display:block"
     data-ad-client="ca-pub-4877378276818686"
     data-ad-slot="1549334788"
     data-ad-format="auto"
     data-full-width-responsive="true"></ins>
<script>
(adsbygoogle = window.adsbygoogle || []).push({});
</script>

# 스택아데믹 🎓

끝까지 읽어주셔서 감사합니다. 떠나시기 전에:

- 작가를 클랩하고 팔로우해주시면 감사하겠습니다! 👏
- 저희를 팔로우해주세요: X | LinkedIn | YouTube | Discord
- 저희 다른 플랫폼도 방문해주세요: In Plain English | CoFeed | Differ
- 스택아데믹닷컴에서  많은 콘텐츠를 만나보세요