실시간 퀴즈 팝업 Vue 프로젝트, 소켓 + 카프카 프로젝트 분석
프로젝트 구조 분석
프로젝트 구성
Vue 프로젝트 | 실시간 퀴즈 팝업 화면 및 소켓 클라이언트 역할로 소켓 서버와 실시간 통신 |
SpringBoot 프로젝트 | 유저 정보 검증, 소켓 서버로 입장 요청 전달, DB 저장, S3 파일 저장 등 백엔드 처리 |
SpringBoot 소켓 + 카프카 프로젝트 |
소켓 서버 역할, 네임스페이스 단위 실시간 소통 처리, 카프카 프로듀서/컨슈머 로직으로 메시지 브로커 역할 |
프로젝트 역할 요약
다른 웹 서비스에서 유저가 로그인 후 Vue 프로젝트 팝업 오픈 시 필수 파라미터들을 쿼리스트링으로 넘기면,
route.query.파라미터명으로 받아서 세션스토리지에 저장하고, 백엔드를 통해 소켓 서버에 연결합니다.
classCode를 네임스페이스로 구분하여 해당 룸으로 입장한 유저들끼리 실시간 퀴즈를 지원합니다.
Vue 프로젝트 분석
소켓 서버 URL 환경변수 설정
VITE_SOCKET_URL=http://10.10.1.87:8084
.env.서버구분 파일에 SpringBoot 소켓 서버 URL을 작성합니다.
백엔드 서버 application-서버구분.yml 파일에 socket io 정보로 작성된 포트와 동일해야 통신이 가능합니다.
유저 입장 화면 Vue 로직
<template>
</template>
<script>
import {onMounted, ref} from "vue";
import axios from "axios";
import router from "@/router/index.js";
import {useLogin} from "@/code/common/login.js";
import {useLoadingStore} from "@/stores/loadingStore.js";
import {useRoute} from "vue-router";
const route = useRoute();
onMounted(() => {
// 유저 정보 객체 생성
const objUserInfo = {
usrTpCd: route.query.usrTpCd,
userName: route.query.userName,
spaceName: route.query.spaceName,
classCode: route.query.classCode,
tempUsrId: route.query.tempUsrId,
uuid: route.query.uuid === "undefined" || route.query.uuid === undefined ? route.query.tempUsrId : route.query.uuid,
wid: route.query.wid,
stuNo: route.query.stuNo,
projectId: route.query.projectId,
boardId: route.query.boardId,
}
// 세션스토리지에 유저 정보 저장
sessionStorage.setItem("userInfo", JSON.stringify(objUserInfo));
showMsgLoading();
// 유저 객체 유효성 검사 백엔드 API 호출
axios.post(`${apiUrl}/api/classrooms/nav`, message, {
headers: { "Content-Type": "application/json" },
}).then(res=>{
if (res.status === 200) {
// 유저 입장 컴포저블 함수 호출
userLogin(res);
}
setTimeout(() => {
hideMsgLoading();
}, 1000);
}).catch(err => {
hideMsgLoading();
router.push({ path: "/404", name: "ErrorPop" });
});
});
</script>
유저가 팝업으로 초기 입장 시 세션스토리지에 유저 정보를 저장합니다.
유저 객체 유효성 검사 백엔드 API 호출 시 정상이면, 유저 입장 컴포저블 함수를 호출합니다.
유저 입장 컴포저블 함수 로직
import axios from "axios";
import router from "@/router/index.js";
import {useMainStore} from "@/stores/mainStore.js";
import {storeToRefs} from "pinia";
import {useSocket} from "@/stores/socket.js";
export function useLogin(){
const mainStore = useMainStore();
const { user } = storeToRefs(mainStore);
const socket = useSocket();
const apiUrl = import.meta.env.VITE_API_URL;
function userLogin(userInfo){
// 소켓 Namespace 생성 요청 백엔드 API 호출
axios.post(`${apiUrl}/api/classrooms/getRealRoom`, user.value)
.then((res) => {
if (!res.data) {
return false;
}
// Pinia 스토어에 유저 정보 저장
user.value = userInfo;
// 실시간 퀴즈 화면 Vue로 이동
router.push({
name: "Classroom",
params: { classCode: user.value.classCode },
})
})
.catch((err) => {
// 에러 페이지로 이동
router.push({
path: '/404',
name: "ErrorPop",
})
})
}
return {
userLogin
}
}
유저 입장 컴포저블 함수에서 classCode를 기준으로 소켓 Namespace 생성하는 백엔드 API를 호출합니다.
유저 소통 화면 Vue 로직
<template>
<div id="classroom">
<div id="classBody">
<하위컴포넌트명 />
<가이드팝업컴포넌트명 v-if="user.userType === 'TE'" />
</div>
</div>
</template>
<script setup>
import WhiteboardComp from "@/components/whiteBoard/WhiteboardComp.vue";
import ClassRoomPreview from "@/components/common/classRoomPreview.vue";
import {useSocket} from "@/stores/socket.js";
import {onMounted, onBeforeUnmount} from "vue";
const socket = useSocket();
onMounted(() => {
// 세션스토리지에서 유저 정보 조회
const userInfo = JSON.parse(sessionStorage.getItem("userInfo"));
// 현재 페이지 로드 시 이벤트 리스너 등록
// 소켓 스토어 함수 호출
window.addEventListener("load", socket.socketInit(userInfo));
})
onBeforeUnmount(()=>{
// "load" 이벤트 리스너 제거
window.removeEventListener("load", socket.socketInit);
})
</script>
유저 입장 화면에서 이동한 유저 소통 화면에서 소켓 스토어에서 소켓 초기화 함수를 호출합니다.
소켓 스토어 함수 로직
import { io } from "socket.io-client";
import { defineStore } from "pinia";
import { onBeforeUnmount, ref } from "vue";
export const useSocket = defineStore("socket", () => {
const socket = {};
const classCode = ref("");
const URL = import.meta.env.VITE_SOCKET_URL;
// 소켓 옵션 객체 정의
const socketOptions = ref({
query: { classCode },
autoConnect: false,
reconnection: true,
transports: ["websocket", "polling"],
withCredentials: true,
timeout: 5000,
reconnectionAttempts: 10,
reconnectionDelay: 2000,
});
// 소켓 클라이언트 생성 및 소켓 이벤트 리스너 등록 함수
function socketInit(userInfo) {
const getCode = userInfo.classCode;
const userName = userInfo.userName;
const userType = userInfo.usrTpCd;
sessionStorage.setItem("classCode", getCode);
sessionStorage.setItem("userType", userType);
sessionStorage.setItem("userName", userName);
classCode.value = getCode;
// 소켓 클라이언트 객체가 비어있는 경우
if (!socket[getCode]) {
// 소켓 클라이언트 객체 생성
socket[getCode] = io(`${URL}/${getCode}`, socketOptions.value);
}
// 소켓이 연결되지 않은 경우
if (!socket[getCode].connected) {
// 소켓 수동 연결
socket[getCode].connect();
// 소켓 연결 이벤트 리스너 등록
socket[getCode].on("connect", () => {
userInfo.sessionId = socket[getCode].id;
// 소켓 서버에 "itsme" 이벤트 emit
sendMessage("itsme", getCode, userInfo);
});
}
// 소켓 연결 에러 이벤트 리스너 등록
socket[getCode].on("connect_error", () => {
setTimeout(() => {
// 소켓 재연결 시도
if (socket[getCode]) socket[getCode].connect();
}, 3000);
});
// 소켓 연결 완료 이벤트 리스너 등록
socket[getCode].on("connected", (sessionId) => {
// 소켓 Namespace 입장 함수 호출
joinLeaveClassRoom("join", getCode, userType, userName, sessionId, userInfo);
});
socket[getCode].on("disconnected", (sessionId, userData) => {
console.log(`${userData.userName} 퇴장`);
});
}
// 소켓 Namespace 입장 함수
function joinLeaveClassRoom(eventName, classCode, userType, userName, sessionId, userInfo) {
const message = {
eventName,
classCode,
userType,
userName,
teacher: userType === "TE" ? userInfo : {},
student: userType === "ST" ? userInfo : {},
sessionId,
sockMessage: {}
};
// 소켓 서버에 "join" 이벤트 전송
sendMessage(eventName, classCode, message);
}
// 소켓 서버에 이벤트 전송 (공통 함수)
function sendMessage(eventName, classCode, message) {
socket[classCode]?.emit(eventName, message);
}
// 컴포전트 제거 전 호출되는 훅
onBeforeUnmount(() => {
const code = classCode.value;
if (socket[code]) {
// 소켓 클라이언트 연결 해제 및 객체 삭제
socket[code].disconnect();
delete socket[code];
}
});
return {
socket,
socketInit,
sendMessage,
joinLeaveClassRoom
};
});
Vue 소켓 io 클라이언트 라이브러리
"socket.io-client": "^4.8.1",
package.json 파일 dependencies에 정의된 소켓 클라이언트 라이브러리 정보입니다.
소켓 이벤트 전송 로직
// 미디어 공유 버튼 클릭 함수
const mediaSendMessage = (socketType, data) => {
const { user } = storeToRefs(mainStore);
const classCode = userInfo.classCode;
const sessionId = user.value.sessionId;
const userType = userInfo.usrTpCd;
const message = {
eventName: "sendMedia",
type: socketType,
classCode: classCode,
sessionId: sessionId,
userType: userType,
data: data,
};
// 소켓 서버에 "sendMedia" 이벤트 전송 (공통 함수 이용)
socketStore.sendMessage("sendMedia", classCode, message);
}
소켓 클라이언트에서 소켓 서버에 이벤트를 전송하는 공통 함수 이용 로직입니다.
SpringBoot 프로젝트 분석
Vue 프로젝트의 백엔드 프로젝트인 SpringBoot 프로젝트에서 소켓 프로젝트에 HTTP 요청할 수 있습니다.
application-서버구분.yml 설정
customSocket:
server: "http://10.10.1.87:8088"
위와 같이, 소켓 포트가 아닌 소켓 프로젝트 톰캣 포트가 작성되어 있습니다.
웹소켓 통신이 아니라 HTTP 통신을 하기 위함입니다.
유저 객체 유효성 검사 백엔드 API
@RestController
@RequestMapping("/api/classrooms")
@RequiredArgsConstructor
@Slf4j
public class ClassroomController {
@PostMapping("/nav")
public ResponseEntity<ParentDTO> callRtBoard(@RequestBody ParentDTO parentDTO) {
log.info("parentDTO : {}", parentDTO);
if (parentDTO != null) {
return ResponseEntity.ok(parentDTO);
}
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}
}
전달받은 유저 데이터 객체가 존재하면 로그를 남기고 반환하고 있습니다.
classroomService를 통해서 상세한 유효성 검사를 하면 더 좋을 것 같습니다.
소켓 Namespace 생성 요청 백엔드 API
컨트롤러 코드
@RestController
@RequestMapping("/api/classrooms")
@RequiredArgsConstructor
@Slf4j
public class ClassroomController {
@Value("${vue-port}")
private String url;
private final ClassroomService classroomService;
@PostMapping("/getRealRoom")
public ResponseEntity<User> getClassRoom(@RequestBody User userInfo){
ObjectMapper mapper = new ObjectMapper();
try {
String classCode = userInfo.getClassCode();
// 소켓 Namespace 생성 요청 Service 호출
boolean isOK = classroomService.sendSocketServer(userInfo);
if (isOK){
return ResponseEntity.ok(userInfo);
} else {
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(userInfo);
}
} catch (IllegalArgumentException e) {
log.warn("잘못된 입력값 : {}", e.getMessage());
return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
} catch (Exception e) {
log.error("알 수 없는 오류 발생", e); // stack trace 출력
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}
소켓 서버로 Namespace 생성 요청을 보내기 위한 컨트롤러입니다.
웹소켓 서버에 HTTP 요청을 보내는 서비스를 호출합니다.
서비스 코드
@Service
@RequiredArgsConstructor
@Slf4j
public class ClassroomService {
private final HttpClient httpClient = HttpClient.newHttpClient();
private final ObjectMapper objectMapper = new ObjectMapper();
@Value("${vue-port}")
private String vuePort;
@Value("${customSocket.server}")
private String socketServer;
public boolean sendSocketServer(User userInfo) {
try {
String jsonBody = objectMapper.writeValueAsString(userInfo);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(socketServer+"/socket/join"))
.POST(HttpRequest
.BodyPublishers
.ofString(jsonBody, StandardCharsets.UTF_8))
.header("Content-Type", "application/json")
.build();
// 소켓 서버 Namespace 생성 백엔드 API 호출
// 동기 방식으로 HTTP 요청 전송 (비동기 방식 : sendAsync 함수)
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
int statusCode = response.statusCode();
if (statusCode == 200) {
return true;
} else {
log.error("서버 응답 실패, 상태 코드: {}", statusCode);
return false;
}
} catch (JsonProcessingException jpe) {
log.error("웹소켓 서버로 보내기 전 User 객체 JSON 직렬화 실패");
} catch (IOException ioe) {
log.error("웹소켓 서버로 보내기 전 웹소켓 서버 통신 중 입출력 오류 발생");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // 인터럽트 플래그 복원
log.error("웹소켓 서버로 보내기 전 웹소켓 서버 통신 중 인터럽트 발생");
} catch (RuntimeException re) {
log.error("웹소켓 서버로 보내기 전 웹소켓 통신 중 예기치 못한 오류 발생");
}
return false;
}
}
소켓 서버 Namespace 생성 API를 호출하여, 유저를 입장시키기 위한 전처리 및 게이트웨이 역할을 하는 서비스입니다.
SpringBoot 소켓 + 카프카 프로젝트 분석
웹소켓, 카프카 라이브러리
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'com.corundumstudio.socketio:netty-socketio:2.0.12'
implementation 'org.springframework.kafka:spring-kafka'
}
build.gradle 파일에 netty 소켓 IO 라이브러리, 카프카 라이브러리 등이 추가되어 있습니다.
Vue URL, 소켓 서버 포트, 카프카 서버 URL 설정
// 톰캣 서버 포트 설정 (HTTP 통신용)
server:
port: 8088
error:
whitelabel:
enabled: false
include-exception: false
include-stacktrace: never
include-message: never
path: "/custom-error"
// 통신을 주고받을 Vue URL 설정
vue-port: "http://10.10.1.87:8082"
// netty 소켓 서버 포트 설정 (웹소켓 통신용)
socket:
io:
server: 0.0.0.0 // 모든 IP에서 오는 요청 수신 대기
port: 8084 // 다른 서비스와 같은 포트 사용 시 충돌 에러 발생
// 스프링 프로젝트명, 카프카 브로커 서버 목록 설정
spring:
application:
name: 프로젝트명
kafka:
bootstrap-servers: 10.10.2.7:9092,10.10.2.8:9092,10.10.2.9:9092
application-서버구분.yml 파일에 소켓 서버 포트, 통신을 주고받을 Vue URL, 소켓 포트, 카프카 브로커 서버 목록이 작성되어 있습니다.
netty 웹소켓 서버 생성
package kr.co.프로젝트명.socket.config;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.Transport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SocketIOConfig {
@Value("${socket.io.server}")
private String server;
@Value("${socket.io.port}")
private int port;
@Value("${vue-port}")
private String vuePort;
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setHostname(server);
config.setPort(port);
config.setMaxFramePayloadLength(1024 * 1024); // 1MB로 설정 예시
config.setEnableCors(true);
config.setAllowCustomRequests(true);
config.setOrigin("*"); // CORS 설정 : 운영 환경에서는 운영 도메인 지정 필요
config.setTransports(Transport.WEBSOCKET, Transport.POLLING);
SocketIOServer server = new SocketIOServer(config);
return server;
}
}
SocketIOConfig를 Bean으로 등록하여, 애플리케이션 실행 시 톰캣 서버와 별도로 실행되는 Netty 웹소켓 서버가 생성됩니다.
소켓 자동 실행 및 종료 처리
package kr.co.프로젝트명.socket.lifeCycle;
import com.corundumstudio.socketio.SocketIOServer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class SocketIOServerLifeCycle {
private final SocketIOServer server;
private boolean isServerRunning = false;
@PostConstruct
public void start(){
if (!isServerRunning) {
server.start();
isServerRunning = true;
log.info("소켓 서버 시작 {}", server.getConfiguration().toString());
return;
}
log.info("이미 시작중 {}", server.getConfiguration().toString());
}
@PreDestroy
public void stop(){
server.stop();
log.info("소켓 서버 DIE");
}
}
@PostConstruct, @PreDestroy 어노테이션을 이용하여 스프링부트 시작 및 종료 시 소켓이 자동 시작 및 종료되도록 설정합니다.
소켓 서버 Namespace 생성 백엔드 API
컨트롤러 코드
@RestController
@Slf4j
@RequiredArgsConstructor
@RequestMapping("/socket")
public class JoinController {
private final JoinService joinService;
@PostMapping("/join")
public ResponseEntity<User> joinClassroom(@RequestBody User userInfo){
log.info("Join Classroom : {}", userInfo);
boolean ready = joinService.joinClassroom(userInfo);
try {
if (ready){
// 소켓 Namespace 생성 성공 시 응답
return ResponseEntity.ok(userInfo);
} else {
return ResponseEntity
.status(HttpStatus.BAD_REQUEST)
.body(userInfo);
}
}
catch (Exception e){
log.error("소켓 Namespace 생성 실패");
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(null);
}
}
}
유저 입장 시 백엔드 API에서 호출하는 소켓 API 컨트롤러입니다.
서비스 코드
@Service
@Slf4j
@RequiredArgsConstructor
public class JoinService {
private final SocketIOController socketIOController;
public boolean joinClassroom(User userInfo){
try {
String classCode = userInfo.getClassCode();
// 소켓 IO 컨트롤러 함수 호출하여 Namespace 생성
socketIOController.createClassroomNamespace(classCode);
return true;
} catch (Exception e){
log.error("Namespace 생성 중 에러 발생 :: {}", e.getMessage());
return false;
}
}
}
소켓 Namespace 생성 컨트롤러를 호출하는 서비스 코드입니다.
소켓 IO 컨트롤러 코드
@Component
@Slf4j
public class SocketIOController {
private final SocketIOServer socketIOServer;
private final SocketIOService socketService;
private final NameSpaceService nameSpaceService;
private final RoomManager roomManager;
public SocketIOController(SocketIOServer socketIOServer,
SocketIOService socketService,
NameSpaceService nameSpaceService,
RoomManager roomManager) {
this.socketIOServer = socketIOServer;
this.socketService = socketService;
this.nameSpaceService = nameSpaceService;
this.roomManager = roomManager;
}
public void createClassroomNamespace(String classCode){
// NameSpace 생성 및 조회 서비스 호출
SocketIONamespace namespace = nameSpaceService.createNamespace(classCode);
log.info("참여 예정 Namespace : {}", namespace.getName());
}
}
소켓 Namespace 생성 컨트롤러 코드입니다.
NameSpace 생성 및 조회 서비스
@Service
@Slf4j
@RequiredArgsConstructor
public class NameSpaceService {
private final SocketIOServer socketIOServer;
private final SocketEventController socketEventController;
private final MediaSocketController mediaSocketController;
private final RoomController roomController;
private final RoomManager roomManager;
private Set<String> namespacesHub = ConcurrentHashMap.newKeySet();
private Set<String> connectedClients = ConcurrentHashMap.newKeySet();
/**
* Namespace 생성 및 조회 함수
*/
public SocketIONamespace createNamespace(String classCode){
// 소켓 서버에서 Namespace 조회
SocketIONamespace namespace = socketIOServer.getNamespace(classCode);
// 소켓 서버에 생성된 Namespace가 없는 경우
if (namespace == null) {
// 소켓 서버에 Namespace 생성
namespace = socketIOServer.addNamespace(classCode);
// 생성된 Namespace를 NamespaceHub에 추가
namespacesHub.add(classCode);
// 소켓 Namespace 이벤트 리스너 등록 함수 호출
setupNamespaceListeners(namespace);
}
log.info("Namespace 목록:{}", namespacesHub);
return namespace;
}
/**
* 소켓 Namespace 이벤트 리스너 등록 함수
*/
public void setupNamespaceListeners(SocketIONamespace namespace){
// 소켓 연결 이벤트 리스너 등록
namespace.addConnectListener(listenConnected(namespace));
// 소켓 연결 해제 이벤트 리스너 등록
namespace.addDisconnectListener(listenDisconnected(namespace));
// 소켓 메시지 수신 이벤트 리스너 등록
namespace.addEventListener("itsme", User.class, roomController.connectSocket());
namespace.addEventListener("join", SocketDTO.class, roomController.joinClassRoom());
namespace.addEventListener("sendMedia", Media.class, mediaSocketController.sendMedia());
}
/**
* 소켓 연결 이벤트 리스너 함수
*/
private ConnectListener listenConnected(SocketIONamespace namespace) {
return (client) -> {
String sessionId = client.getSessionId().toString();
String classCode = client.getNamespace().getName();
log.info("방금 입장한 유저 세션ID : {}", sessionId);
// 연결된 클라이언트 목록에 유저 세션ID가 있는 경우
if (connectedClients.contains(sessionId)) {
log.info("{}님은 이미 연결된 클라이언트입니다.", sessionId);
return;
}
// 연결된 클라이언트 목록에 유저 세션ID 추가
connectedClients.add(sessionId);
log.info("{} Namespace에 {}님이 연결되었습니다.", classCode, sessionId);
};
}
/**
* 클라이언트 연결 해제 리스너 함수 (브라우저 종료 및 클라이언트 연결 해제 시 호출)
*/
private DisconnectListener listenDisconnected(SocketIONamespace namespace) {
return client -> {
String roomId = client.getNamespace().getName();
String sessionId = client.getSessionId().toString();
// 연결된 소켓 클라이언트 객체에 "disconnected" 이벤트 명시
SocketDTO user = client.get("user");
if (user == null) {
log.warn("세션 {}의 user 정보가 없습니다.", sessionId);
return;
}
user.setEventName("disconnected");
// 이미 연결 해제된 세션이면 종료
if (!connectedClients.contains(sessionId)) {
return;
}
// 연결된 클라이언트 목록 및 Room에서 유저 제거
connectedClients.remove(sessionId);
client.leaveRoom(roomId);
// 유저 소켓 연결 해제 메시지 카프카 프로듀서 send 함수 호출
socketEventController.disconnectSocket(client, user);
log.info("{}님이 {} Namespace에서 연결 해제 되었습니다.", user.userName, roomId);
// 방에 아무도 없으면 네임스페이스 제거
RoomOperations roomOps = namespace.getRoomOperations(roomId);
if (roomOps != null && roomOps.getClients().isEmpty()) {
log.info("유저 퇴장 후, 방에 아무도 없어서 {} Namespace를 제거합니다.", roomId);
namespacesHub.remove(roomId);
socketIOServer.removeNamespace(roomId);
}
};
}
}
소켓 Namespace 생성 및 조회, 소켓 이벤트 리스너 등록 로직이 있는 서비스입니다.
입장하려는 Namespace가 없는 경우에만 신규 생성됩니다.
소켓 메시지 DTO
@ToString
@Data
@NoArgsConstructor
public class SocketDTO {
public String eventName;
public String classCode;
public String userType;
public String userName;
public String sessionId;
public Teacher teacher;
public Student student;
public Map<String, Object> sockMessage;
@JsonCreator
public SocketDTO(@JsonProperty("eventName") String eventName,
@JsonProperty("classCode") String classCode,
@JsonProperty("userType") String userType,
@JsonProperty("userName") String userName,
@JsonProperty("sessionId") String sessionId,
@JsonProperty("teacher") Teacher teacher,
@JsonProperty("student") Student student,
@JsonProperty("sockMessage") Map<String,Object> sockMessage) {
this.eventName = eventName;
this.classCode = classCode;
this.userType = userType;
this.userName = userName;
this.sessionId = sessionId;
this.teacher = teacher;
this.student = student;
this.sockMessage = sockMessage;
}
}
@Getter
@Setter
@ToString
public class Media extends SocketDTO {
private String type;
private Map<String,Object> data;
@JsonCreator
public Media(@JsonProperty("eventName") String eventName,
@JsonProperty("type") String type,
@JsonProperty("userType") String userType,
@JsonProperty("teacher") Teacher teacher,
@JsonProperty("student") Student student,
@JsonProperty("data") Map<String,Object> data,
@JsonProperty("classCode") String classCode) {
super.eventName = eventName;
super.classCode = classCode;
super.userType = userType;
this.teacher = teacher;
this.student = student;
this.data = data;
this.type = type;
}
}
소켓 이벤트 리스너 등록 시 DTO 클래스 타입을 지정할 수 있습니다.
소켓으로 메시지 emit 시, 소켓 서버에서 JSON payload를 DTO로 자동 매핑합니다.
소켓 Room 입장 백엔드 처리
소켓 입장 이벤트 리스너 콜백 함수
@Component
@Slf4j
@RequiredArgsConstructor
public class RoomController {
private final RoomService roomService;
/**
* Vue에서 소켓 "connect" 메시지 수신 시 emit 하는 "itsme" 이벤트 콜백 함수
*/
public DataListener<User> connectSocket() {
return (client, data, ackSender) -> {
roomService.connectUser(client, data);
};
}
/**
* Vue에서 소켓 "connected" 메시지 수신 시 emit 하는 "join" 이벤트 콜백 함수
*/
public DataListener<SocketDTO> joinClassRoom() {
return (client, data, ackSender) -> {
roomService.devidedUser(client, data);
};
}
}
소켓 Room 입장 처리는 devidedUser 함수에 작성되어 있습니다.
소켓 입장 관련 서비스 코드
@Service
@Slf4j
@RequiredArgsConstructor
public class RoomService {
private final SocketIOServer socketIOServer;
private final RoomManager roomManager;
private final MessageProducer messageProducer;
/**
* 소켓 클라이언트 연결 메시지 카프카 전송
*/
public void connectUser(SocketIOClient client, User user) {
String roomId = "/" + user.getClassCode();
messageProducer.connectMsg(roomId, user);
}
/**
* Namespace 내 Room에 클라이언트 입장 후
* 유저 입장 관련 정보 카프카 전송
*/
public void devidedUser(SocketIOClient client, SocketDTO data) {
try {
String roomId = "/" + data.getClassCode();
String userType = data.getUserType();
String sessionId = data.getSessionId();
// 유저 타입이 선생님인 경우, 신규 룸 등록
if(userType.equals("TE")){
roomManager.addRoom(roomId);
}
// 소켓 Room에 소켓 클라이언트 입장
client.joinRoom(roomId);
// 소켓 입장 메시지 카프카 전송
messageProducer.socketMsg(roomId, data);
// 소켓 클라이언트 객체에 유저 정보, 세션ID 저장
client.set("user",data);
client.set("sessionId",sessionId);
log.info("현재 client 객체 = {}", (SocketDTO) client.get("user"));
int size = client.getNamespace().getRoomOperations(roomId).getClients().size();
log.info("현재 {} 방에 있는 사람 수: {}명",client.getNamespace().getRoomOperations(roomId), size);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
Vue에서 소켓 “join” 이벤트 emit 시 devidedUser 함수가 호출됩니다.
소켓 클라이언트를 소켓 Namespace 내부 Room에 입장하고, 입장 메시지를 프로듀서로 전송합니다.
카프카 프로듀서 관련 로직
미디어 공유 컨트롤러 코드
@Component
@Slf4j
@RequiredArgsConstructor
public class MediaSocketController {
private final MediaSocketService mediaSocketService;
public DataListener<Media> sendMedia (){
return ((client, data, ackSender) -> {
mediaSocketService.processMedia(client, data);
});
}
}
소켓 이벤트 리스너에 등록된 콜백 함수에서 카프카 프로듀서 관련 서비스를 호출합니다.
미디어 공유 서비스 코드
@Service
@Slf4j
@RequiredArgsConstructor
public class MediaSocketService {
private final SocketIOServer socketIOServer;
private final MessageProducer messageProducer;
/**
* 카프카 프로듀서 호출 함수
*/
public void processMedia(SocketIOClient client, Media media) {
try {
String roomId = "/" + media.getClassCode();
messageProducer.mediaMsg(roomId, media);
} catch (Exception e){
log.error(e.getMessage());
}
}
/**
* 카프카 컨슈머에서 호출하는 함수
*/
public void tcrSend(String roomId, Media media){
SocketIONamespace namespace = socketIOServer.getNamespace(roomId);
BroadcastOperations broadcastOperations = namespace.getRoomOperations(roomId);
for (SocketIOClient client : broadcastOperations.getClients()){
if (!client.getSessionId().toString().equals(media.getSessionId())){
// 소켓 Namespace 내 Room에 속한 소켓 클라이언트들에게 이벤트 전송
client.sendEvent(media.getEventName(), media);
}
}
}
}
카프카 프로듀서 클래스 함수를 호출하거나, 컨슈머 클래스 함수에서 호출하는 서비스 코드입니다.
카프카 프로듀서 코드
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageProducer {
// application-서버구분.properties 파일 bootstrap-servers 옵션을 참조하여 프로듀서 생성
private final KafkaTemplate<String, User> messageKafkaTemplateUser;
private final KafkaTemplate<String, SocketDTO> messageKafkaTemplateSocketDTO;
private final KafkaTemplate<String, Media> messageKafkaTemplateMedia;
/**
* 소켓 클라이언트 연결 메시지 카프카 전송
*/
public void connectMsg(String roomId, User user){
ProducerRecord<String, User> record = new ProducerRecord<>("user-topic", roomId, user);
messageKafkaTemplateUser.send(record);
}
/**
* 유저 입장 관련 정보 카프카 전송
*/
public void socketMsg(String roomId, SocketDTO socketDTO){
ProducerRecord<String, SocketDTO> record = new ProducerRecord<>("socket-topic", roomId, socketDTO);
messageKafkaTemplateSocketDTO.send(record);
}
/**
* 미디어 공유 메시지 카프카 전송
*/
public void mediaMsg(String roomId, Media media){
ProducerRecord<String, Media> record = new ProducerRecord<>("media-topic", roomId, media);
messageKafkaTemplateMedia.send(record);
}
}
카프카 프로듀서 클래스에서 KafkaTemplate을 통해 프로듀서를 자동 생성합니다.
각 토픽에 roomId가 메시지 키, 메시지 객체가 메시지 값인 레코드를 비동기 전송합니다.
카프카 프로듀서 생성 클래스
@Configuration
public class ProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServers;
// 커스터마이징 한 producer 설정
@Bean
public Map<String,Object> customProducerConfig() {
return CommonJSONSerializer.customProducer(bootStrapServers);
}
// SocketDTO 전용
@Bean
public ProducerFactory<String, SocketDTO> messageProducerFactorySocketDTO() {
return new DefaultKafkaProducerFactory<>(customProducerConfig());
}
@Bean
public KafkaTemplate<String, SocketDTO> messageKafkaTemplateSocketDTO() {
return new KafkaTemplate<>(messageProducerFactorySocketDTO());
}
// Media 전용
@Bean
public ProducerFactory<String, Media> messageProducerFactoryMedia() {
return new DefaultKafkaProducerFactory<>(customProducerConfig());
}
@Bean
public KafkaTemplate<String, Media> messageKafkaTemplateMedia() {
return new KafkaTemplate<>(messageProducerFactoryMedia());
}
// User 전용
@Bean
public ProducerFactory<String, User> messageProducerFactoryUser() {
return new DefaultKafkaProducerFactory<>(customProducerConfig());
}
@Bean
public KafkaTemplate<String, User> messageKafkaTemplateUser() {
return new KafkaTemplate<>(messageProducerFactoryUser());
}
}
카프카 프로듀서들을 생성하는 클래스를 정의할 수 있습니다.
카프카 프로듀서 설정 유틸 클래스
public class CommonJSONSerializer {
static Map<String,Object> customProducer(String bootstrapServers){
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return props;
}
}
카프카 프로듀서 옵션을 설정하는 유틸 클래스를 정의합니다.
카프카 컨슈머 관련 로직
카프카 컨슈머 코드
@Component
@Slf4j
public class MessageConsumer {
private final WbSocketService wbSocketService;
private final RoomManager roomManager;
private final RoomService roomService;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
private final PickerSocketService pickerSocketService;
private final MediaSocketService mediaSocketService;
private final SocketEventService socketEventService;
private final WidgetSocketService widgetSocketService;
private final NameSpaceService nameSpaceService;
private final SocketIOController socketIOController;
private final SocketIOServer socketIOServer;
private final ConsumerConfig consumerConfig;
/**
* 카프카 컨슈머 클래스 생성자
*/
public MessageConsumer(WbSocketService wbSocketService,
RoomManager roomManager,
RoomService roomService,
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,
PickerSocketService pickerSocketService,
MediaSocketService mediaSocketService,
SocketEventService socketEventService,
WidgetSocketService widgetSocketService,
NameSpaceService nameSpaceService,
SocketIOController socketIOController,
SocketIOServer socketIOServer,
ConsumerConfig consumerConfig) {
this.wbSocketService = wbSocketService;
this.roomManager = roomManager;
this.roomService = roomService;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
this.pickerSocketService = pickerSocketService;
this.mediaSocketService = mediaSocketService;
this.socketEventService = socketEventService;
this.widgetSocketService = widgetSocketService;
this.nameSpaceService = nameSpaceService;
this.socketIOController = socketIOController;
this.socketIOServer = socketIOServer;
this.consumerConfig = consumerConfig;
}
/**
* "user-topic" 컨슈머
*/
@KafkaListener(
id = "user",
topics = "user-topic",
groupId = "#{@consumerConfig.genGroupId()}",
containerFactory = "userKafkaListenerContainerFactory")
public void consumeUserMsg(ConsumerRecord<String, User> record) {
try {
String roomId = record.key();
User newUser = record.value();
log.info("유저 커넥트 메시지: {}",record.value());
SocketIONamespace namespace = nameSpaceService.findNameSpace(roomId);
log.info("다른 소켓 서버에 추가된 namespace :: {}", namespace.getName());
// 모든 방에 send
roomService.sendConnectMsg(roomId, newUser);
} catch (Exception e) {
log.error("유저 커넥트 메시지 처리 중 오류 발생: {}", e.getMessage());
}
}
/**
* "socket-topic" 컨슈머
*/
@KafkaListener(
id = "socket",
topics = "socket-topic",
groupId = "#{@consumerConfig.genGroupId()}",
containerFactory = "socketKafkaListenerContainerFactory")
public void consumeSockMsg(ConsumerRecord<String, SocketDTO> record) {
try {
String eventName = record.value().getEventName();
String roomId = record.key();
if (eventName.equals("join")) {
log.info("조인 {} ", eventName);
roomService.joinLeaveMessageHandler(roomId, record.value());
} else if (eventName.equals("disconnected")) {
log.info("연결해제 consume {} ", eventName);
roomService.sendDisconnectMsg(roomId, record.value());
} else {
log.info("컨트롤 {} ", eventName);
socketEventService.controlMessageHandler(roomId, record.value());
}
log.info("Socket 메시지: {}", record.value());
} catch (Exception e) {
log.error("Socket 메시지 처리 중 오류 발생: {}", e.getMessage());
}
}
/**
* "media-topic" 컨슈머
*/
@KafkaListener(
id = "media",
topics = "media-topic",
groupId = "#{@consumerConfig.genGroupId()}",
containerFactory = "mediaKafkaListenerContainerFactory")
public void consumeMediaMsg(ConsumerRecord<String, Media> record) {
try {
String roomId = record.key();
// 소켓 클라이언트들에게 이벤트 전송하는 함수 호출
mediaSocketService.tcrSend(roomId, record.value());
log.info("Media 메시지: {}", record.value());
} catch (Exception e) {
log.error("Media 메시지 처리 중 오류 발생: {}", e.getMessage());
}
}
}
@KafkaListener으로 인해, 컨슈머 클래스 선언만으로도 Spring kafka가 토픽에서 메시지를 읽어옵니다.
카프카 컨슈머 생성 클래스
@Configuration
@Slf4j
@RequiredArgsConstructor
public class ConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// 공통
@Bean
public Map<String,Object> customConsumerConfig(){
String groupId = genGroupId();
return CommonJSONDeserializer.customConsumer(bootstrapServers, groupId);
}
// 동적 groupId 생성
public String genGroupId() {
try {
// 서버 호스트명을 기반으로 groupId 생성
return "프로젝트명-" + InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// 호스트명을 가져올 수 없다면 랜덤 UUID 사용
return "프로젝트명-" + UUID.randomUUID();
}
}
@Bean
public StringJsonMessageConverter jsonMessageConverter() {
return new StringJsonMessageConverter();
}
// SocketDTO 전용
@Bean
public ConsumerFactory<String, SocketDTO> socketConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(customConsumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SocketDTO> socketKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SocketDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(socketConsumerFactory());
factory.setConcurrency(25);
return factory;
}
// Media 전용
@Bean
public ConsumerFactory<String, Media> mediaConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(customConsumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Media> mediaKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Media> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(mediaConsumerFactory());
factory.setConcurrency(25);
return factory;
}
// user 전용
@Bean
public ConsumerFactory<String, User> userConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(customConsumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
factory.setConcurrency(25);
return factory;
}
}
카프카 컨슈머들을 생성하는 클래스를 정의할 수 있습니다.
카프카 컨슈머 설정 유틸 클래스
public class CommonJSONDeserializer {
static Map<String, Object> customConsumer(String bootStrapServer, String dynamicGroupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, dynamicGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
}
카프카 컨슈머 옵션을 설정하는 유틸 클래스를 정의합니다.