Удаление старых буферов, убраны dev логи, FCM уведомления
All checks were successful
Build rosetta-wss / build (push) Successful in 3m33s

This commit was merged in pull request #7.
This commit is contained in:
2026-02-25 17:25:41 +00:00
12 changed files with 233 additions and 5 deletions

7
.env
View File

@@ -11,3 +11,10 @@ PORT=3000 # Порт, на котором будет работать серве
CDN_SERVERS=http://10.211.55.2:7789
#SDU - Server Delivery Updates
SDU_SERVERS=http://10.211.55.2:7777
#Firebase Credentials
FIREBASE_CREDENTIALS_PATH=serviceAccount.json
#Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений)
BUFFER_CLEANUP_DAYS=7

1
.gitignore vendored
View File

@@ -37,4 +37,5 @@ target
build/.env
build/.env*
build/*.json

View File

@@ -34,6 +34,12 @@
<version>42.7.1</version>
</dependency>
<dependency>
<groupId>com.google.firebase</groupId>
<artifactId>firebase-admin</artifactId>
<version>9.2.0</version>
</dependency>
<!-- Jakarta Persistence API -->
<dependency>
<groupId>jakarta.persistence</groupId>

13
serviceAccount.json Normal file
View File

@@ -0,0 +1,13 @@
{
"type": "service_account",
"project_id": "rosetta-messanger-dev",
"private_key_id": "69822675ff0a49a8ce0bd147dbab3ef432963485",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC/iYhrgWfi/ecP\nIs7Y0u7MueE/leU0QZF3V6hIcs1Iq1/CSUYcszv+YXuKItfwi0pYtntPmnCtdKnv\nFgZZN/9I0/AjFPlggqEXBH223FjsAfoNTk8a0PXyF8oOG0u2pN34t20nxXFSYars\n/6yJMKOBdaJ+L51dlBeCj40UFm3zQSVpejRkspWmT5Qz7kWh2e8yj6ukPyXEfrrD\n8jmM6uQeKwzlmfjD8YbL7R2m+lE1hzkoFCo0PrqMQYIIpAMT9n9meCk/LrQMIUnA\n+zICsTffFwbFzmJ2A022po3ObL4dUYgqGrPs5DqjxwTF4mI0g2dkPvoxpUKkVzC7\nHzzOYTwRAgMBAAECggEAGIacDSa1rpdpfK2DLrjFHY+YXAPYehpX7fVaVeXmtcKa\nppkAXSYcQq9T2jUqUSGyOZgq7l9Yb0rhhFJd1L3QDBDLY9/zK3GpU/ZeF0oG1DhP\n6bVDJCwUBNXD/eNujKrMG5AimOBLlEtvpMemXa4jOa1eSxR+F4tM1AndUZr/pZFT\nIdPBrId3WemkcWH2b1R4RLVu8zqci1x+nWvL4Gt8O/gDpDzlmV2Disvpf300a5wU\ngIKcOI2H6vouOwL9ltMPTBXdpDIC64Q8zvK3973d3Fqx2Mt2mO4ZgylFpk2nmAm6\n3wknrkJDE+Z/H7XmdwpMhRmmyku3vuqsDwmy7BEGGwKBgQD03oOJ6eb9kG6JX830\nRH/Vhh3NlpVEzuHzRA8WznsX5WaeyMF9NWo4Lh9ifBxkRAQKftEC0CR5GZp3/895\nQuoMMbrrklWWkVRwCZWYQBquRXNHbTPj6ztvHaZZrw8HhemZWtOF89T5f8Kn4LcD\nFwR8kIqXeLxeTbfc2hQXZvP7vwKBgQDIPmhM2Td/Ni4Mbe3wGmOyS0RweK5qcKBF\n32N1rCr8smrD1irg+w9Qrmz2cX3udtVJDRLz4ctSmiEPs+UrzuyG6yTbdv7bh8AN\nsqHOdc4GRqGsB/jfXagVjRMSeObQk5fR03tiPLP15J+lpoEsjwUzw7L3Ioacaq62\nqSi87br8LwKBgQDu/uMJz4bJg5evcxeUSusuH5mlGE0WfIniIlJL4zoXR6qSXcUk\nDOdgb/vn5tTbM9tx1vbvNPH0VH4Ek2QPqbTANCWJWSk6LRxpwaEFmcOwxk5Or5IO\n6X/34suDCy6zHAu0xwZe3m7HGeCGc/iMBoI1heoPDyNjM5257AviD3UhBwKBgCWS\nmC17QI+NEfzhD5lSykwlFVVpP4jXUytpLBdjU7mQnLncULVgRlJkOCvRxchd4c1Q\nN7MtNeJs6zEwFxsuO3FhY8wOOunkQeQQFY5QynShAiruYANBZo2Mp/x6VQzj9MO5\nQ9h9/WJxIIeLg4dh2p8I5Ga8wrdMyTWa7frtPH2fAoGALUkccsu4Mcws55Xy1aHH\nNETI6CRQpaU4v6jNhuuY4g6jp5ScaqkpuuzHOYorpmdC3180YXzPn2pwKeYWa0BA\nTjR8z603bCpdRfpic1UvYi96jwAcNiJTAGGVwmyYopIFBdHphfGS3hWZlYfonVps\n+k87j3WGwOiUGqoKSGJi1cw=\n-----END PRIVATE KEY-----\n",
"client_email": "firebase-adminsdk-fbsvc@rosetta-messanger-dev.iam.gserviceaccount.com",
"client_id": "115421173243098464717",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-fbsvc%40rosetta-messanger-dev.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}

View File

@@ -52,7 +52,7 @@ import im.rosetta.packet.Packet6Message;
import im.rosetta.packet.Packet7Read;
import im.rosetta.packet.Packet8Delivery;
import im.rosetta.packet.Packet9DeviceNew;
import im.rosetta.service.services.BufferCleanupService;
import io.orprotocol.Server;
import io.orprotocol.Settings;
import io.orprotocol.packet.PacketManager;
@@ -73,6 +73,7 @@ public class Boot {
private ServerAdapter serverAdapter;
private ClientManager clientManager;
private OnlineManager onlineManager;
private BufferCleanupService bufferCleanupService;
/**
* Конструктор по умолчанию, использует порт 3000 для сервера
@@ -96,6 +97,14 @@ public class Boot {
30
), packetManager, this.serverAdapter);
this.clientManager = new ClientManager(server);
/**
* Каждые сколько дней будет очищаться буфер (это влияет на синхронизацию сообщений, так
* как при синхронизации клиент запрашивает пакеты из буфера за последние 7 дней, если этот параметр будет меньше,
* то клиенты не смогут синхронизировать старые сообщения)
*/
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
}
/**
@@ -141,6 +150,7 @@ public class Boot {
this.registerAllExecutors();
this.registerAllEvents();
this.printBootMessage();
this.bufferCleanupService.start();
return this;
}catch(Exception e){
this.logger.error(Color.RED + "Booting error, stack trace:");

View File

@@ -9,4 +9,18 @@ public class BufferRepository extends Repository<Buffer> {
super(Buffer.class);
}
/**
* Удаляет все записи страше чем timestampMs
* @param timestampMs метка времени в миллисекундах, все записи с меткой времени меньше которой будут удалены
*/
public void deleteOlderThan(long timestampMs) {
this.executeInTransaction(session -> {
String hql = "DELETE FROM Buffer WHERE timestamp < :timestamp";
session.createMutationQuery(hql)
.setParameter("timestamp", timestampMs)
.executeUpdate();
return null;
});
}
}

View File

@@ -0,0 +1,93 @@
package im.rosetta.service.dispatch;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseOptions;
import com.google.firebase.messaging.FirebaseMessaging;
import com.google.firebase.messaging.Message;
import com.google.firebase.messaging.Notification;
import im.rosetta.database.repository.UserRepository;
import im.rosetta.service.services.UserService;
/**
* Класс для отправки push-уведомлений пользователям через Firebase Cloud Messaging (FCM).
*/
public class FirebaseDispatcher {
private UserRepository userRepository = new UserRepository();
private UserService userService = new UserService(userRepository);
public FirebaseDispatcher() {
initializeFirebase();
}
/**
* Инициализация Firebase Admin SDK
*/
private void initializeFirebase() {
if (FirebaseApp.getApps().isEmpty()) {
try {
String firebaseCredentialsPath = System.getenv("FIREBASE_CREDENTIALS_PATH");
if (firebaseCredentialsPath == null || firebaseCredentialsPath.isEmpty()) {
throw new IllegalStateException("FIREBASE_CREDENTIALS_PATH environment variable is not set");
}
FileInputStream serviceAccount = new FileInputStream(firebaseCredentialsPath);
FirebaseOptions options = FirebaseOptions.builder()
.setCredentials(GoogleCredentials.fromStream(serviceAccount))
.build();
FirebaseApp.initializeApp(options);
} catch (IOException e) {
throw new RuntimeException("Failed to initialize Firebase", e);
}
}
}
/**
* Отправляет push-уведомление пользователю с данным публичным ключом
* @param publicKey публичный ключ пользователя
* @param title заголовок уведомления
* @param message текст уведомления
*/
public void sendPushNotification(String publicKey, String title, String messageText) {
List<String> tokens = userService.getNotificationsTokens(publicKey);
if (tokens == null || tokens.isEmpty()) {
return;
}
for (String token : tokens) {
try {
Message message = Message.builder()
.setNotification(Notification.builder()
.setTitle(title)
.setBody(messageText)
.build())
.setToken(token)
.build();
String response = FirebaseMessaging.getInstance().send(message);
System.out.println("Successfully sent message: " + response);
} catch (Exception e) {
System.err.println("Failed to send notification to token: " + token + ", error: " + e.getMessage());
}
}
}
/**
* Отправляет push-уведомление нескольким пользователям
* @param publicKeys список публичных ключей пользователей
* @param title заголовок уведомления
* @param messageText текст уведомления
*/
public void sendPushNotification(List<String> publicKeys, String title, String messageText) {
for (String publicKey : publicKeys) {
sendPushNotification(publicKey, title, messageText);
}
}
}

View File

@@ -27,6 +27,7 @@ public class MessageDispatcher {
private final ClientManager clientManager;
private final BufferRepository bufferRepository = new BufferRepository();
private final BufferService bufferService;
private final FirebaseDispatcher firebaseDispatcher = new FirebaseDispatcher();
public MessageDispatcher(ClientManager clientManager, PacketManager packetManager) {
this.clientManager = clientManager;
@@ -84,6 +85,10 @@ public class MessageDispatcher {
* Отправляем сообщение всем, кто в беседе
*/
this.clientManager.sendPacketToAuthorizedPK(groupMembersPublicKeys, packet);
/**
* Отправляем PUSH уведомление
*/
this.firebaseDispatcher.sendPushNotification(groupMembersPublicKeys, "Rosetta", "New message in group");
}
/**
@@ -103,7 +108,14 @@ public class MessageDispatcher {
* чтобы синхронизировать отправленные сообщения
*/
this.clientManager.retranslate(client, packet);
/**
* Отправляем сообщение получателю
*/
this.clientManager.sendPacketToAuthorizedPK(toPublicKey, packet);
/**
* Отправляем PUSH уведомление получателю
*/
this.firebaseDispatcher.sendPushNotification(toPublicKey, "Rosetta", "New message from");
if(!bufferizationNeed){
/**

View File

@@ -0,0 +1,52 @@
package im.rosetta.service.services;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import im.rosetta.database.repository.BufferRepository;
import im.rosetta.logger.Logger;
import im.rosetta.logger.enums.Color;
public class BufferCleanupService {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final BufferRepository bufferRepository = new BufferRepository();
private final int cleanupEveryDays;
private final Logger logger;
public BufferCleanupService(int cleanupEveryDays, Logger logger) {
this.cleanupEveryDays = cleanupEveryDays;
this.logger = logger;
}
/**
* Запускает планировщик, который будет каждые 24 часа удалять из буфера
* пакеты старше чем cleanupEveryDays дней
*/
public void start() {
this.logger.info(Color.CYAN + "Sheduled cleanup buffer every " + this.cleanupEveryDays + " days");
scheduler.scheduleAtFixedRate(
this::cleanupOldPackets,
0, //стартовая задержка 0, то есть сразу при запуске сервиса будет выполнена очистка буфера
24,
TimeUnit.HOURS
);
}
/**
* Функция планировщика, которая удаляет из буфера пакеты старше чем cleanupEveryDays дней
*/
private void cleanupOldPackets() {
try {
long sevenDaysAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(this.cleanupEveryDays);
bufferRepository.deleteOlderThan(sevenDaysAgo);
} catch (Exception e) {
e.printStackTrace();
}
}
public void stop() {
scheduler.shutdown();
}
}

View File

@@ -90,4 +90,26 @@ public class UserService extends Service<UserRepository> {
this.getRepository().update(user);
}
/**
* Получает список токенов пуш уведомлений пользователя
* @param user пользователь, у которого нужно получить список токенов пуш уведомлений
* @return список токенов пуш уведомлений пользователя
*/
public List<String> getNotificationsTokens(User user) {
return user.getNotificationsTokens();
}
/**
* Получает список токенов пуш уведомлений пользователя
* @param publicKey публичный ключ пользователя, у которого нужно получить список токенов пуш уведомлений
* @return список токенов пуш уведомлений пользователя
*/
public List<String> getNotificationsTokens(String publicKey) {
User user = this.getRepository().findByField("publicKey", publicKey);
if(user == null){
return null;
}
return user.getNotificationsTokens();
}
}

View File

@@ -149,8 +149,7 @@ public class Server extends WebSocketServer {
threadLocker.releaseLock(packet, executor.getClass());
}
} catch (Exception e) {
e.printStackTrace();
//client.disconnect(ServerFailures.BAD_PACKET);
client.disconnect(ServerFailures.BAD_PACKET);
}
}

View File

@@ -40,7 +40,6 @@ public class PacketFactory {
packet.read(stream);
return packet;
} catch (Exception e) {
e.printStackTrace();
throw new ProtocolException("Failed to create packet with id " + packetId);
}
}