4차산업혁명의 일꾼/Java&Spring웹개발과 서버 컴퓨터

카프카와 레디스 사용법 정리

르무엘 2024. 12. 26. 04:02

1. 카프카(Kafka)

@Bean
public ProducerFactory<String, AlarmEvent> producerFactory() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory(configs);
}

@Bean
public KafkaTemplate<String, AlarmEvent> alarmEventKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

String ,Json 으로 직렬화하고 서버 주소를 설정한다. 

이것을 카프카템플릿에 담는다.

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {

    private final KafkaTemplate<Integer, AlarmEvent> alarmEventKafkaTemplate;

    @Value("${spring.kafka.topic.notification}")
    private String topic;

    public void send(AlarmEvent event) {
        alarmEventKafkaTemplate.send(topic, event.getReceiverUserId(), event);
        log.info("send fin");
    }
}

 

생산자 (Producer)역할을 하는 로직이다. KafkaTemplate으로 설정파일에 저장한 토픽에 AlarmEvent를 보낸다.

아래는 소비자(Consumer)역할을 하는 로직이다.

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final AlarmService alarmService;

    @KafkaListener(topics = "${spring.kafka.topic.notification}")
    public void consumeNotification(AlarmEvent event, Acknowledgment ack) {
        log.info("Consume the event {}", event);
        alarmService.send(event.getType(), event.getArgs(), event.getReceiverUserId());
        ack.acknowledge();
    }
}

@KafkaListener 로 토픽이름을 정하고, AlarmEvent로 메시지를 받는다. Acknowledgment(ack)로 잘 받았음을 나타낸다.

 

카프카는 토픽을 만들고, 생산하고 소비하는 부분에서 완충역할을 한다.

 

2. 레디스(Redis)

 
@Configuration
@EnableRedisRepositories
@RequiredArgsConstructor
public class RedisConfiguration {
    private final RedisProperties redisProperties;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        String redisUrl = redisProperties.getUrl();
        if (redisUrl == ""|| redisUrl.isEmpty()) {
            throw new IllegalArgumentException("Redis URL must not be or empty");
        }
        RedisURI redisURI = RedisURI.create(redisUrl);
        org.springframework.data.redis.connection.RedisConfiguration configuration = LettuceConnectionFactory.createRedisConfiguration(redisURI);
        LettuceConnectionFactory factory = new LettuceConnectionFactory(configuration);
        factory.afterPropertiesSet();
        return factory;
    }

    @Bean
    public RedisTemplate<String, User> userRedisTemplate() {
        RedisTemplate<String, User> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<User>(User.class));
        return redisTemplate;
    }

}

Spring Data Redis와 Lettuce 라이브러리를 활용하여 Redis와의 상호작용을 설정

  • Redis Repository 활성화: @EnableRedisRepositories를 통해 Spring Data Redis 저장소를 활성화
    1. RedisTemplate: Spring에서 Redis 데이터를 직렬화 및 역직렬화하고, Redis와 상호작용하는데 사용 
    2. RedisTemplate<String, User>:
      • Redis 키는 문자열 타입
      • Redis 값은 User 객체로 설정
    3. 직렬화 구성:
      • 키를 위해 StringRedisSerializer 사용.
      • 값을 JSON 형식으로 직렬화하기 위해 Jackson2JsonRedisSerializer<User> 사용.
    4. 이 설정된 템플릿은 다른 컴포넌트에서 RedisTemplate을 주입받아 사용 

 

@Slf4j
@Repository
@RequiredArgsConstructor
public class UserCacheRepository {

    private final RedisTemplate<String, User> userRedisTemplate;

    private final static Duration USER_CACHE_TTL = Duration.ofDays(3);


    public void setUser(User user) {
        String key = getKey(user.getUsername());
        log.info("Set User to Redis {}({})", key, user);
        userRedisTemplate.opsForValue().set(key, user, USER_CACHE_TTL);
    }

    public Optional<User> getUser(String userName) {
        User data = userRedisTemplate.opsForValue().get(getKey(userName));
        log.info("Get User from Redis {}", data);
        return Optional.ofNullable(data);
    }


    private String getKey(String userName) {
        return "UID:" + userName;
    }
}

사용자 캐시 TTL(Time To Live)를 3일로 설정한다~! db 접근을 줄이고 redis캐시를 사용하여 사용자 정보를 빠르게 조회가능하다.

LIST