Синхронизация сообщений для офлайн пользователей в группах

This commit is contained in:
RoyceDa
2026-02-23 17:30:15 +02:00
parent b0f0986e0d
commit 773659c2ba
4 changed files with 133 additions and 43 deletions

View File

@@ -3,15 +3,20 @@ package im.rosetta.database.entity;
import java.util.ArrayList;
import java.util.List;
import im.rosetta.database.CreateUpdateEntity;
import im.rosetta.database.converters.StringListConverter;
import org.hibernate.annotations.Cascade;
import org.hibernate.annotations.CascadeType;
import im.rosetta.database.CreateUpdateEntity;
import jakarta.persistence.CollectionTable;
import jakarta.persistence.Column;
import jakarta.persistence.Convert;
import jakarta.persistence.ElementCollection;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.OrderColumn;
import jakarta.persistence.Table;
/**
@@ -28,13 +33,20 @@ public class Group extends CreateUpdateEntity {
@Column(name = "groupId")
private String groupId;
@Convert(converter = StringListConverter.class)
@Column(name = "membersPublicKeys", nullable = false, columnDefinition = "TEXT")
private List<String> membersPublicKeys = new ArrayList<>();
@Convert(converter = StringListConverter.class)
@Column(name = "bannedPublicKeys", nullable = false, columnDefinition = "TEXT")
private List<String> bannedPublicKeys = new ArrayList<>();
@ElementCollection(fetch = FetchType.LAZY)
@CollectionTable(name = "group_members", joinColumns = @JoinColumn(name = "group_id"))
@Column(name = "public_key", nullable = false, columnDefinition = "TEXT")
@OrderColumn(name = "order_index")
@Cascade({ CascadeType.ALL, CascadeType.DELETE_ORPHAN })
private List<String> membersPublicKeys = new ArrayList<>();
@ElementCollection(fetch = FetchType.LAZY)
@CollectionTable(name = "group_banned", joinColumns = @JoinColumn(name = "group_id"))
@Column(name = "public_key", nullable = false, columnDefinition = "TEXT")
@OrderColumn(name = "order_index")
@Cascade({ CascadeType.ALL, CascadeType.DELETE_ORPHAN })
private List<String> bannedPublicKeys = new ArrayList<>();
public Long getId() {
return id;

View File

@@ -1,8 +1,12 @@
package im.rosetta.database.repository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.hibernate.Hibernate;
import im.rosetta.database.QuerySession;
import im.rosetta.database.Repository;
import im.rosetta.database.entity.Group;
@@ -18,11 +22,16 @@ public class GroupRepository extends Repository<Group> {
* @return список публичных ключей участников группы
*/
public List<String> findGroupMembers(String groupId) {
Group group = this.findByField("groupId", groupId);
if(group == null) {
try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
put("groupId", groupId);
}})){
Group group = querySession.getQuery().uniqueResult();
if(group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
return group.getMembersPublicKeys();
}
return new ArrayList<>();
}
return group.getMembersPublicKeys();
}
/**
@@ -45,7 +54,18 @@ public class GroupRepository extends Repository<Group> {
* @return группа с заданным id, или null, если группа не найдена
*/
public Group getGroup(String groupId) {
return this.findByField("groupId", groupId);
String hql = "FROM Group g WHERE g.groupId = :groupId";
HashMap<String, Object> params = new HashMap<>();
params.put("groupId", groupId);
try (QuerySession<Group> qs = this.buildQuery(hql, params)) {
Group group = qs.getQuery().uniqueResult();
if (group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
Hibernate.initialize(group.getBannedPublicKeys());
}
return group;
}
}
/**
@@ -65,13 +85,18 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно добавить в группу
*/
public void addMemberToGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId);
if(group != null) {
List<String> membersPublicKeys = group.getMembersPublicKeys();
if(!membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.add(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
put("groupId", groupId);
}})){
Group group = querySession.getQuery().uniqueResult();
if(group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
List<String> membersPublicKeys = group.getMembersPublicKeys();
if(!membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.add(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
}
}
}
}
@@ -82,13 +107,18 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно удалить из группы
*/
public void removeMemberFromGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId);
if(group != null) {
List<String> membersPublicKeys = group.getMembersPublicKeys();
if(membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.remove(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
put("groupId", groupId);
}})){
Group group = querySession.getQuery().uniqueResult();
if(group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
List<String> membersPublicKeys = group.getMembersPublicKeys();
if(membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.remove(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
}
}
}
}
@@ -99,20 +129,39 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно забанить в группе
*/
public void banMemberInGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId);
if(group != null) {
List<String> bannedPublicKeys = group.getBannedPublicKeys();
List<String> membersPublicKeys = group.getMembersPublicKeys();
if(membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.remove(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
put("groupId", groupId);
}})){
Group group = querySession.getQuery().uniqueResult();
if(group != null) {
Hibernate.initialize(group.getBannedPublicKeys());
List<String> bannedPublicKeys = group.getBannedPublicKeys();
if(!bannedPublicKeys.contains(memberPublicKey)) {
bannedPublicKeys.add(memberPublicKey);
group.setBannedPublicKeys(bannedPublicKeys);
this.update(group);
}
}
if(!bannedPublicKeys.contains(memberPublicKey)) {
bannedPublicKeys.add(memberPublicKey);
group.setBannedPublicKeys(bannedPublicKeys);
}
this.update(group);
}
}
/**
* Получает все группы в которых состоит пользователь с заданным публичным ключом
* @param memberPublicKey публичный ключ пользователя, для которого нужно найти группы
* @return список ID групп, в которых состоит пользователь с заданным публичным ключом, или пустой список, если таких групп нет
*/
public List<String> findGroupsByMember(String memberPublicKey) {
String hql = "FROM Group g WHERE :memberPublicKey MEMBER OF g.membersPublicKeys";
List<String> groupIds = new ArrayList<>();
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("memberPublicKey", memberPublicKey);
try(QuerySession<Group> querySession = this.buildQuery(hql, parameters)){
List<Group> groups = querySession.getQuery().list();
for(Group group : groups){
groupIds.add(group.getGroupId());
}
}
return groupIds;
}
}

View File

@@ -7,6 +7,7 @@ import im.rosetta.client.ClientManager;
import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository;
import im.rosetta.packet.Packet11Typeing;
import im.rosetta.packet.base.PacketBaseDialog;
import im.rosetta.service.services.BufferService;
@@ -19,7 +20,7 @@ import io.orprotocol.packet.PacketManager;
* Такой диспетчер нужен для того, чтобы не загромождать логику обработчиков сообщений, а так же для того, чтобы
* централизовать логику отправки сообщений и сохранения их в буфер
* Например, при отправке группового сообщения, диспетчер сам достает участников группы и
* отправляет сообщение каждому участнику, а так же сохраняет сообщение в буфер для каждого участника, который офлайн
* отправляет сообщение каждому участнику
*/
public class MessageDispatcher {
@@ -38,6 +39,7 @@ public class MessageDispatcher {
* @param packet пакет с групповым сообщением
*/
public void sendGroup(PacketBaseDialog packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", ""));
if(groupMembersPublicKeys.isEmpty()){
@@ -67,7 +69,18 @@ public class MessageDispatcher {
}
this.clientManager.sendPacketToAuthorizedPK(groupMembersPublicKeys, packet);
//TODO: Сохранить сообщение в буфер для группы, чтобы группы тоже синхронизировались
if(packet instanceof Packet11Typeing){
/**
* Если это пакет печати его не обязательно кэшировать, так как он нужен только
* для отображения статуса печати в реальном времени
*/
return;
}
/**
* Кладем пакет в буфер для будущей синхронизации и на случай если кто-то из участников оффлайн,
* в toPublicKey при отправке в группу у нас находится #group:groupId
*/
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey.replace("#group:", ""), packet);
}
/**

View File

@@ -8,6 +8,7 @@ import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.database.QuerySession;
import im.rosetta.database.entity.Buffer;
import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository;
import im.rosetta.exception.UnauthorizedExeception;
import im.rosetta.packet.Packet7Read;
import im.rosetta.service.Service;
@@ -20,6 +21,7 @@ import io.orprotocol.packet.PacketManager;
public class BufferService extends Service<BufferRepository> {
private PacketManager packetManager;
private GroupRepository groupRepository = new GroupRepository();
public BufferService(BufferRepository repository, PacketManager packetManager) {
super(repository);
@@ -41,10 +43,24 @@ public class BufferService extends Service<BufferRepository> {
*/
throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer");
}
/**
* Получаем группы клиента, и исходя из этого формируем список каких пакетов нам нужно взять из базы
*/
List<String> clientGroups = this.groupRepository.findGroupsByMember(eciAuthentificate.getPublicKey());
List<String> toValue = new ArrayList<>();
/**
* Добавляем публичный ключ клиента, так как ему нужны пакеты, которые были отправлены ему напрямую, а не в группу
*/
toValue.add(eciAuthentificate.getPublicKey());
/**
* Добавляем группы клиента, так как ему нужны пакеты, которые были отправлены в эти группы
*/
toValue.addAll(clientGroups);
String toPublicKey = eciAuthentificate.getPublicKey();
String hql = "FROM Buffer WHERE (to = :to OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC";
String hql = "FROM Buffer WHERE (to IN (:to) OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC";
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("to", toPublicKey);
parameters.put("to", toValue);
parameters.put("from", toPublicKey);
parameters.put("timestamp", fromTimestampMs);
List<Packet> packets = new ArrayList<>();