기술, 개발/RabbitMQ

RabbitMQ 개념

Jaejin Sim 2025. 9. 10. 22:57
반응형

 RabbitMQ 는 AMQP를 구현한 미들웨어 다양한 언어를 지원하여 인기가 많다

RabbitMQ 예제코드를 작성하고 테스트하면서 제 나름대로 이해하기위해 정리하였습니다. 문서에 대한 설명이 빈약할 수 있으니 양해 바랍니다.

AMQP

  • 메세지 지향 미들 웨어를 위한 표준 응용 계층 프로토콜

구조

크게 보면 3개로 구분 지을 수 있다

프로듀서 (P)

  • 메세지를 큐에 보낸다

컨슈머 (C)

  • 큐에 있는 메세지를 받는다

큐 (hello)

  • 메세지를 담는다
  • 큐는 이름을 정할 수 있다 (hello)

메소드 설명

이름  설명
queue_declare() 큐가 없으면 생성
exchange_declare() exchange 없으면 생성
queue_bind() exchange 와 큐를 연결함
basic_publish() 메세지 큐에 전달
basic_consume() 큐에서 메세지 받기

메세지를 큐에 보낸다 (producer)

  • 메세지를 담을 큐가 있어야 한다.
  • 큐가 없으면 queue_declare를 호출 하여 큐를 생성 한다.
  • basic_publish() 사용하여 메세지를 큐에 보낸다.
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Message\\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\\n";

$channel->close();
$connection->close();
?>

queue_declare() 인수 설명

  • 큐가 존재하지 않으면 큐를 생성한다
// hello 라는 이름의 큐 선언
/*
name: 큐 이름 (hello)
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$channel->queue_declare('hello', false, false, false, false);

인수  설명  비고
name 큐 이름 amq로 시작하는 큐이름은 생성 할 수 없다
passive true : hello 라는 큐가 있으면 Queue.declare_ok 없으면 오류
false : hello 라는 큐가 없으면 생성해줌
 
durable ture : RabbitMQ 서버가 다시 시작 되도 큐는 살아있음
false : 서버가 죽으면 큐도 삭제됨
 
exclusive true : 배타적 큐 선언 (임시큐)
프로세스 (프로듀서, 컨슈머)가 종료되면 큐 삭제됨
독립적인 큐 혼자 쓸건지
프로듀서는 단발성으로 프로세스가 바로 종료 되기때문에
배타적큐 선언은 컨슈머에서 선언해야함
auto_delete true : 컨슈머의 채널이 닫혔을 때 큐 삭제됨

false : 컨슈머의 채널이 닫혀도 큐는 삭제 안됨
 
  • 큐 이름이 빈 값 이면 이름을 자동으로 생성함. ex)amq.gen-JzTY20BRgKO-HjmUJj0wLg
  • 최초 실행하면 RabbitMQ에 hello 라는 큐가 없기때문에 큐를 먼저 생성해줍니다.
  • 어드민에서 hello 라는 큐가 생성된 걸 볼 수 있음

참고!!

// 처음에 아래와 같이 hello 큐를 생성
$channel->queue_declare('hello', false, false, false, false);

// 만약 옵션 값을 변경 한다면 ??
$channel->queue_declare('hello', false, true, false, false); // 오류 발생!!!

이미 hello 라는 큐가 있어서 오류 발생함.
이럴 때는 큐이름을 변경 하던가, 어드민에서 hello 큐를 삭제 해야 함

basic_publish() 인수 설명

basic_publish에서 직접! 큐로 메세지를 담을 수 없다.

큐에 메세지를 담기 위해선 exchange를 사용 해야함

// 큐에 메세지를 보낸다
$channel->basic_publish($msg, '', 'hello');
인수  설명
$msg 큐에 담길 메세지
‘’ exchange 이름
hello route key
exchange 이름을 지정하지 않으면 exchange 는 default exchange 를 사용한다
default exchange 는 direct 방식이며, route key 는 큐 이름과 동일하다

default exchange

큐에 있는 메세지를 받는다 (Consumer)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\\Connection\\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\\n";
    $msg->ack();
};

$channel->basic_consume('hello', 'ConsumerTag', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

basic_consume() 인수 설명

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See <https://www.rabbitmq.com/confirms.html> for details.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
*/
$channel->basic_consume('hello', 'ConsumerTag', false, false, false, false, $callback);

 

인수  설명
queue 사용할 큐 이름 지정
consumer_tag 컨슈머 식별자 (단순 텍스트).
빈 값이면 자동생성
no_local true : 메시지를 게시한 연결로 메시지를 보내지 않습니다.
(no_local 플래그는 RabbitMQ에서 지원되지 않습니다.)
no_ack true : 자동으로 ack() 처리
false : 수동으로 ack() 호출 해야함
exclusive trure: 독점 컨슈머 . 해당 컨슈머만 큐에 엑세스 할 수 있다. → 체크해보고
다른 컨슈머는 해당 큐에 접근할 수 없다
nowait async 모드로 동작할지에 대한 여부 (뭔지 모르겠음)
false일 경우, 서버에 요청하고 그에 대한 결과를 받은 이후에 함수가 종료
true : 서버에 요청하고, 즉시 함수가 종료
callback 콜백 함수 → 여기서 메세지 받기 처리함

ack() ?

$msg→ack()

 

  • 메세지를 정상적으로 받았는지 컨슈머는 RabbitMQ 서버에 ok 응답 해줘야 함
  • ack() 호출 하지 않으면 작업은 처리 했는데 unacked 임

  • ack 는 ? 정상적으로 처리 완료 되었어요. 하고 RabbitMQ에 알려 주는 것
  • 만약 컨슈머가 강제 종료 되었을 경우( ack() 호출하지 않은 메세지)
  • 다시 ready 로 넘어가서 컨슈머 재 실행 시 메세지 재 발송 처리함

Queue exchange bind

exchange 선언 (생성)

/*
    name: $exchange
    type: direct
    passive: false
    durable: true // the exchange will survive server restarts
    auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare('Route', AMQPExchangeType::DIRECT, false, true, true);

 

인수  설명
name exchange 이름
type 4개의 타입이 있음
passive true : exchange (name)가 없으면 예외 발생. 있으면 아무것도 하지 않음
false : 없으면 새로 선언
durable ture : RabbitMQ 서버가 다시 시작 되도 exchange 는 살아있음
false : 서버가 죽으면 exchange 도 삭제됨
auto_delete true : 해당 큐에 대한 바인딩이 끊기고 해당 큐를 참조 하는 컨슈머의 채널이 닫히면 exchange 는 삭제됨
큐 auto_delete = true 해줘야 역할을 제대로 할 수 있음
(어드민에서 수동으로 바인딩을 끊지 않으면 해당 exchange 는 계속 살아있음)

auto_delete 예시

/*
큐는 auto_delete = false
exchange는 auto_delete = true
*/
$channel->exchange_declare('direct_logs','direct', false,false,true);
$channel->queue_declare('testq2', false, false, false, false);
$channel->queue_bind('testq2', 'direct_logs');

큐 (testq2) 에 direact_logs exchange 바인딩 되어있음

testq2 → auto_delete = false

direact_logs → auto_delete = true

컨슈머 종료 했을 때

direct_logs excahgne 삭제 되지 않고 살아있음

수동으로 testq2 unbind 시 direct_logs 삭제된 부분 확인 가능

/*
큐는 auto_delete = true
exchange는 auto_delete = true
*/
$channel->exchange_declare('direct_logs','direct', false,false,true);
$channel->queue_declare('testq2', false, false, false, true);
$channel->queue_bind('testq2', 'direct_logs');

컨슈머 실행

컨슈머 종료

큐랑 익스체인지 둘다 삭제 되어있음

반응형