Синхронизация бесед #6

Merged
Royce59 merged 4 commits from dev into main 2026-02-24 16:37:06 +00:00
11 changed files with 232 additions and 92 deletions

View File

@@ -200,7 +200,7 @@ public class Boot {
this.packetManager.registerExecutor(17, new Executor17GroupCreate()); this.packetManager.registerExecutor(17, new Executor17GroupCreate());
this.packetManager.registerExecutor(18, new Executor18GroupInfo()); this.packetManager.registerExecutor(18, new Executor18GroupInfo());
this.packetManager.registerExecutor(19, new Executor19GroupInviteInfo()); this.packetManager.registerExecutor(19, new Executor19GroupInviteInfo());
this.packetManager.registerExecutor(20, new Executor20GroupJoin()); this.packetManager.registerExecutor(20, new Executor20GroupJoin(this.packetManager));
this.packetManager.registerExecutor(21, new Executor21GroupLeave()); this.packetManager.registerExecutor(21, new Executor21GroupLeave());
this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(22, new Executor22GroupBan());
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager));

View File

@@ -53,7 +53,7 @@ public class ClientManager {
} }
/** /**
* Отправить пакет всем АВТОРИЗОВАННЫМ клиентам с публичным ключом publicKey * Отправить пакет ВСЕМ АВТОРИЗОВАННЫМ клиентам с публичным ключом publicKey
* @param publicKey публичный ключ получателя * @param publicKey публичный ключ получателя
* @param packet пакет для отправки * @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту * @throws ProtocolException если произошла ошибка при отправке пакета клиенту
@@ -83,6 +83,34 @@ public class ClientManager {
} }
} }
/**
* Отправить пакет всем клиентам с публичными ключом как у client, кроме клиента client, который является отправителем и не должен получать этот пакет
* @param client клиент
* @param packet пакет для отправки
* @throws ProtocolException
*/
public void retranslate(Client client, Packet packet) throws ProtocolException{
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
HashSet<Client> clients = this.clientIndexer
.getClients(ECIAuthentificate.class, "publicKey", eciAuthentificate.getPublicKey());
if(clients == null){
/**
* Нет авторизованных сессий с таким публичным ключом
*/
return;
}
for(Client c : clients){
/**
* Проходим по всем устройствам с таким публичным ключом и ретранслируем им пакет, кроме того устройства что
* отправило пакет
*/
if(c.equals(client)){
continue;
}
c.send(packet);
}
}
/** /**
* Отправить пакет всем клиентам с публичными ключами из списка publicKeys * Отправить пакет всем клиентам с публичными ключами из списка publicKeys
* @param publicKeys список публичных ключей получателей * @param publicKeys список публичных ключей получателей

View File

@@ -3,15 +3,20 @@ package im.rosetta.database.entity;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import im.rosetta.database.CreateUpdateEntity; import org.hibernate.annotations.Cascade;
import im.rosetta.database.converters.StringListConverter; import org.hibernate.annotations.CascadeType;
import im.rosetta.database.CreateUpdateEntity;
import jakarta.persistence.CollectionTable;
import jakarta.persistence.Column; import jakarta.persistence.Column;
import jakarta.persistence.Convert; import jakarta.persistence.ElementCollection;
import jakarta.persistence.Entity; import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue; import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType; import jakarta.persistence.GenerationType;
import jakarta.persistence.Id; import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.OrderColumn;
import jakarta.persistence.Table; import jakarta.persistence.Table;
/** /**
@@ -28,13 +33,20 @@ public class Group extends CreateUpdateEntity {
@Column(name = "groupId") @Column(name = "groupId")
private String groupId; private String groupId;
@Convert(converter = StringListConverter.class)
@Column(name = "membersPublicKeys", nullable = false, columnDefinition = "TEXT")
private List<String> membersPublicKeys = new ArrayList<>();
@Convert(converter = StringListConverter.class) @ElementCollection(fetch = FetchType.LAZY)
@Column(name = "bannedPublicKeys", nullable = false, columnDefinition = "TEXT") @CollectionTable(name = "group_members", joinColumns = @JoinColumn(name = "group_id"))
private List<String> bannedPublicKeys = new ArrayList<>(); @Column(name = "public_key", nullable = false, columnDefinition = "TEXT")
@OrderColumn(name = "order_index")
@Cascade({ CascadeType.ALL, CascadeType.DELETE_ORPHAN })
private List<String> membersPublicKeys = new ArrayList<>();
@ElementCollection(fetch = FetchType.LAZY)
@CollectionTable(name = "group_banned", joinColumns = @JoinColumn(name = "group_id"))
@Column(name = "public_key", nullable = false, columnDefinition = "TEXT")
@OrderColumn(name = "order_index")
@Cascade({ CascadeType.ALL, CascadeType.DELETE_ORPHAN })
private List<String> bannedPublicKeys = new ArrayList<>();
public Long getId() { public Long getId() {
return id; return id;

View File

@@ -1,8 +1,12 @@
package im.rosetta.database.repository; package im.rosetta.database.repository;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import org.hibernate.Hibernate;
import im.rosetta.database.QuerySession;
import im.rosetta.database.Repository; import im.rosetta.database.Repository;
import im.rosetta.database.entity.Group; import im.rosetta.database.entity.Group;
@@ -18,11 +22,16 @@ public class GroupRepository extends Repository<Group> {
* @return список публичных ключей участников группы * @return список публичных ключей участников группы
*/ */
public List<String> findGroupMembers(String groupId) { public List<String> findGroupMembers(String groupId) {
Group group = this.findByField("groupId", groupId); try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
if(group == null) { put("groupId", groupId);
}})){
Group group = querySession.getQuery().uniqueResult();
if(group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
return group.getMembersPublicKeys();
}
return new ArrayList<>(); return new ArrayList<>();
} }
return group.getMembersPublicKeys();
} }
/** /**
@@ -30,12 +39,9 @@ public class GroupRepository extends Repository<Group> {
* @param groupId ID группы * @param groupId ID группы
* @param creatorPublicKey публичный ключ создателя группы, который будет единственным участником группы при создании * @param creatorPublicKey публичный ключ создателя группы, который будет единственным участником группы при создании
*/ */
public void createGroup(String groupId, String creatorPublicKey) { public void createGroup(String groupId) {
Group group = new Group(); Group group = new Group();
group.setGroupId(groupId); group.setGroupId(groupId);
List<String> membersPublicKeys = new ArrayList<>();
membersPublicKeys.add(creatorPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.save(group); this.save(group);
} }
@@ -45,7 +51,18 @@ public class GroupRepository extends Repository<Group> {
* @return группа с заданным id, или null, если группа не найдена * @return группа с заданным id, или null, если группа не найдена
*/ */
public Group getGroup(String groupId) { public Group getGroup(String groupId) {
return this.findByField("groupId", groupId); String hql = "FROM Group g WHERE g.groupId = :groupId";
HashMap<String, Object> params = new HashMap<>();
params.put("groupId", groupId);
try (QuerySession<Group> qs = this.buildQuery(hql, params)) {
Group group = qs.getQuery().uniqueResult();
if (group != null) {
Hibernate.initialize(group.getMembersPublicKeys());
Hibernate.initialize(group.getBannedPublicKeys());
}
return group;
}
} }
/** /**
@@ -65,13 +82,18 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно добавить в группу * @param memberPublicKey публичный ключ участника, которого нужно добавить в группу
*/ */
public void addMemberToGroup(String groupId, String memberPublicKey) { public void addMemberToGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId); try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
if(group != null) { put("groupId", groupId);
List<String> membersPublicKeys = group.getMembersPublicKeys(); }})){
if(!membersPublicKeys.contains(memberPublicKey)) { Group group = querySession.getQuery().uniqueResult();
membersPublicKeys.add(memberPublicKey); if(group != null) {
group.setMembersPublicKeys(membersPublicKeys); Hibernate.initialize(group.getMembersPublicKeys());
this.update(group); List<String> membersPublicKeys = group.getMembersPublicKeys();
if(!membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.add(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
}
} }
} }
} }
@@ -82,13 +104,18 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно удалить из группы * @param memberPublicKey публичный ключ участника, которого нужно удалить из группы
*/ */
public void removeMemberFromGroup(String groupId, String memberPublicKey) { public void removeMemberFromGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId); try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
if(group != null) { put("groupId", groupId);
List<String> membersPublicKeys = group.getMembersPublicKeys(); }})){
if(membersPublicKeys.contains(memberPublicKey)) { Group group = querySession.getQuery().uniqueResult();
membersPublicKeys.remove(memberPublicKey); if(group != null) {
group.setMembersPublicKeys(membersPublicKeys); Hibernate.initialize(group.getMembersPublicKeys());
this.update(group); List<String> membersPublicKeys = group.getMembersPublicKeys();
if(membersPublicKeys.contains(memberPublicKey)) {
membersPublicKeys.remove(memberPublicKey);
group.setMembersPublicKeys(membersPublicKeys);
this.update(group);
}
} }
} }
} }
@@ -99,20 +126,39 @@ public class GroupRepository extends Repository<Group> {
* @param memberPublicKey публичный ключ участника, которого нужно забанить в группе * @param memberPublicKey публичный ключ участника, которого нужно забанить в группе
*/ */
public void banMemberInGroup(String groupId, String memberPublicKey) { public void banMemberInGroup(String groupId, String memberPublicKey) {
Group group = this.findByField("groupId", groupId); try(QuerySession<Group> querySession = this.buildQuery("FROM Group g WHERE g.groupId = :groupId", new HashMap<String, Object>(){{
if(group != null) { put("groupId", groupId);
List<String> bannedPublicKeys = group.getBannedPublicKeys(); }})){
List<String> membersPublicKeys = group.getMembersPublicKeys(); Group group = querySession.getQuery().uniqueResult();
if(membersPublicKeys.contains(memberPublicKey)) { if(group != null) {
membersPublicKeys.remove(memberPublicKey); Hibernate.initialize(group.getBannedPublicKeys());
group.setMembersPublicKeys(membersPublicKeys); List<String> bannedPublicKeys = group.getBannedPublicKeys();
if(!bannedPublicKeys.contains(memberPublicKey)) {
bannedPublicKeys.add(memberPublicKey);
group.setBannedPublicKeys(bannedPublicKeys);
this.update(group);
}
} }
if(!bannedPublicKeys.contains(memberPublicKey)) {
bannedPublicKeys.add(memberPublicKey);
group.setBannedPublicKeys(bannedPublicKeys);
}
this.update(group);
} }
} }
/**
* Получает все группы в которых состоит пользователь с заданным публичным ключом
* @param memberPublicKey публичный ключ пользователя, для которого нужно найти группы
* @return список ID групп, в которых состоит пользователь с заданным публичным ключом, или пустой список, если таких групп нет
*/
public List<String> findGroupsByMember(String memberPublicKey) {
String hql = "FROM Group g WHERE :memberPublicKey MEMBER OF g.membersPublicKeys";
List<String> groupIds = new ArrayList<>();
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("memberPublicKey", memberPublicKey);
try(QuerySession<Group> querySession = this.buildQuery(hql, parameters)){
List<Group> groups = querySession.getQuery().list();
for(Group group : groups){
groupIds.add(group.getGroupId());
}
}
return groupIds;
}
} }

View File

@@ -184,7 +184,7 @@ public class Executor0Handshake extends PacketExecutor<Packet0Handshake> {
newDevicePacket.setDeviceId(deviceId); newDevicePacket.setDeviceId(deviceId);
newDevicePacket.setDeviceName(deviceName); newDevicePacket.setDeviceName(deviceName);
newDevicePacket.setDeviceOs(deviceOs); newDevicePacket.setDeviceOs(deviceOs);
newDevicePacket.setIpAddress(client.getSocket().getRemoteSocketAddress().getAddress().getHostAddress()); newDevicePacket.setIpAddress(client.getIpAddress());
clientManager.sendPacketToAuthorizedPK(publicKey, newDevicePacket); clientManager.sendPacketToAuthorizedPK(publicKey, newDevicePacket);
/** /**
* Сбрасываем клиенту все старые подтверждения устройств, чтобы исключить спам запросами * Сбрасываем клиенту все старые подтверждения устройств, чтобы исключить спам запросами

View File

@@ -25,7 +25,7 @@ public class Executor17GroupCreate extends PacketExecutor<Packet17GroupCreate> {
return; return;
} }
String groupId = RandomUtil.randomString(16); String groupId = RandomUtil.randomString(16);
this.groupRepository.createGroup(groupId, eciAuthentificate.getPublicKey()); this.groupRepository.createGroup(groupId);
/** /**
* Отправляем клиенту ид созданной группы * Отправляем клиенту ид созданной группы
*/ */

View File

@@ -3,17 +3,25 @@ package im.rosetta.executors;
import im.rosetta.Failures; import im.rosetta.Failures;
import im.rosetta.client.tags.ECIAuthentificate; import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.database.entity.Group; import im.rosetta.database.entity.Group;
import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository; import im.rosetta.database.repository.GroupRepository;
import im.rosetta.packet.Packet20GroupJoin; import im.rosetta.packet.Packet20GroupJoin;
import im.rosetta.packet.runtime.NetworkGroupStatus; import im.rosetta.packet.runtime.NetworkGroupStatus;
import im.rosetta.service.services.BufferService;
import io.orprotocol.ProtocolException; import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client; import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor; import io.orprotocol.packet.PacketExecutor;
import io.orprotocol.packet.PacketManager;
public class Executor20GroupJoin extends PacketExecutor<Packet20GroupJoin> { public class Executor20GroupJoin extends PacketExecutor<Packet20GroupJoin> {
private final GroupRepository groupRepository = new GroupRepository(); private final GroupRepository groupRepository = new GroupRepository();
private final BufferRepository bufferRepository = new BufferRepository();
private final BufferService bufferService;
public Executor20GroupJoin(PacketManager packetManager) {
this.bufferService = new BufferService(bufferRepository, packetManager);
}
@Override @Override
public void onPacketReceived(Packet20GroupJoin packet, Client client) throws Exception, ProtocolException { public void onPacketReceived(Packet20GroupJoin packet, Client client) throws Exception, ProtocolException {
@@ -53,12 +61,21 @@ public class Executor20GroupJoin extends PacketExecutor<Packet20GroupJoin> {
client.send(packet); client.send(packet);
return; return;
} }
/**
* Все проверки пройдены, клиент может вступить в группу
*/
packet.setStatus(NetworkGroupStatus.JOINED);
/**
* Кладем пакет в буфер для будущей синхронизации
*/
this.bufferService.pushPacketToBuffer("server", eciAuthentificate.getPublicKey(), packet);
/** /**
* Добавляем клиента в группу и возвращаем клиенту статус JOINED * Добавляем клиента в группу и возвращаем клиенту статус JOINED
*/ */
this.groupRepository.addMemberToGroup(groupId, eciAuthentificate.getPublicKey()); this.groupRepository.addMemberToGroup(groupId, eciAuthentificate.getPublicKey());
packet.setStatus(NetworkGroupStatus.JOINED); /**
* Возвращаем клиенту ответный пакет с статусом JOINED
*/
client.send(packet); client.send(packet);
} }

View File

@@ -38,6 +38,14 @@ public class Executor7Read extends PacketExecutor<Packet7Read> {
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return; return;
} }
if(!eciAuthentificate.getPublicKey().equals(fromPublicKey)){
/**
* Клиент пытается прочитать сообщения от имени того, кем не является
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
packet.setPrivateKey(""); packet.setPrivateKey("");
if(toPublicKey.startsWith("#group:")){ if(toPublicKey.startsWith("#group:")){

View File

@@ -14,11 +14,20 @@ public class Packet20GroupJoin extends Packet {
private String groupId; private String groupId;
private NetworkGroupStatus status; private NetworkGroupStatus status;
/**
* Строка группы, которая содержит информацию о группе, такую как ее название, описание и ключ
* Строка зашифрована обратимым шифрованием, где ключом выступает - реальный приватный ключ
* входящего в группу клиента. Нужно это для будущей синхронзации, так как клиенту на его другом
* устройстве нужно получить ключ группы и ее информацию. Сервер расшифровать эту строку не может. Эту
* строку может расшифровать только клиент, так как она зашифрована его приватным ключом
*/
private String groupString;
@Override @Override
public void read(Stream stream) { public void read(Stream stream) {
this.groupId = stream.readString(); this.groupId = stream.readString();
this.status = NetworkGroupStatus.fromCode(stream.readInt8()); this.status = NetworkGroupStatus.fromCode(stream.readInt8());
this.groupString = stream.readString();
} }
@Override @Override
@@ -27,6 +36,7 @@ public class Packet20GroupJoin extends Packet {
stream.writeInt16(this.packetId); stream.writeInt16(this.packetId);
stream.writeString(this.groupId); stream.writeString(this.groupId);
stream.writeInt8(this.status.getCode()); stream.writeInt8(this.status.getCode());
stream.writeString(this.groupString);
return stream; return stream;
} }
@@ -62,4 +72,20 @@ public class Packet20GroupJoin extends Packet {
this.status = status; this.status = status;
} }
/**
* Получить строку группы, которая содержит информацию о группе, такую как ее название, описание и ключ
* @return строка группы
*/
public String getGroupString() {
return groupString;
}
/**
* Установить строку группы, которая содержит информацию о группе, такую как ее название, описание и ключ
* @param groupString строка группы
*/
public void setGroupString(String groupString) {
this.groupString = groupString;
}
} }

View File

@@ -1,12 +1,12 @@
package im.rosetta.service.dispatch; package im.rosetta.service.dispatch;
import java.util.HashSet;
import java.util.List; import java.util.List;
import im.rosetta.client.ClientManager; import im.rosetta.client.ClientManager;
import im.rosetta.client.tags.ECIAuthentificate; import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.database.repository.BufferRepository; import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository; import im.rosetta.database.repository.GroupRepository;
import im.rosetta.packet.Packet11Typeing;
import im.rosetta.packet.base.PacketBaseDialog; import im.rosetta.packet.base.PacketBaseDialog;
import im.rosetta.service.services.BufferService; import im.rosetta.service.services.BufferService;
@@ -19,7 +19,7 @@ import io.orprotocol.packet.PacketManager;
* Такой диспетчер нужен для того, чтобы не загромождать логику обработчиков сообщений, а так же для того, чтобы * Такой диспетчер нужен для того, чтобы не загромождать логику обработчиков сообщений, а так же для того, чтобы
* централизовать логику отправки сообщений и сохранения их в буфер * централизовать логику отправки сообщений и сохранения их в буфер
* Например, при отправке группового сообщения, диспетчер сам достает участников группы и * Например, при отправке группового сообщения, диспетчер сам достает участников группы и
* отправляет сообщение каждому участнику, а так же сохраняет сообщение в буфер для каждого участника, который офлайн * отправляет сообщение каждому участнику
*/ */
public class MessageDispatcher { public class MessageDispatcher {
@@ -38,6 +38,7 @@ public class MessageDispatcher {
* @param packet пакет с групповым сообщением * @param packet пакет с групповым сообщением
*/ */
public void sendGroup(PacketBaseDialog packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException { public void sendGroup(PacketBaseDialog packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey(); String toPublicKey = packet.getToPublicKey();
List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", "")); List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", ""));
if(groupMembersPublicKeys.isEmpty()){ if(groupMembersPublicKeys.isEmpty()){
@@ -55,7 +56,21 @@ public class MessageDispatcher {
return; return;
} }
/** /**
* Отправляем всем участникам группы, кроме отправителя этот пакет, попутно не забывая проверить, а не один ли он в группе * Ретранслируем сообщение ВСЕМ авторизованным сессиям отправителя КРОМЕ текущей,
* чтобы синхронизировать отправленные сообщения
*/
if(!(packet instanceof Packet11Typeing)){
/**
* Если это пакет печати его не обязательно кэшировать, так как он нужен только
* для отображения статуса печати в реальном времени
* Кладем пакет в буфер для будущей синхронизации и на случай если кто-то из участников оффлайн,
* в toPublicKey при отправке в группу у нас находится #group:groupId
*/
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey.replace("#group:", ""), packet);
this.clientManager.retranslate(client, packet);
}
/**
* Отправляем всем участникам группы, кроме отправителя, попутно проверяем, а не один ли он в группе
*/ */
groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey()); groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey());
if(groupMembersPublicKeys.isEmpty()){ if(groupMembersPublicKeys.isEmpty()){
@@ -65,9 +80,10 @@ public class MessageDispatcher {
*/ */
return; return;
} }
/**
* Отправляем сообщение всем, кто в беседе
*/
this.clientManager.sendPacketToAuthorizedPK(groupMembersPublicKeys, packet); this.clientManager.sendPacketToAuthorizedPK(groupMembersPublicKeys, packet);
//TODO: Сохранить сообщение в буфер для группы, чтобы группы тоже синхронизировались
} }
/** /**
@@ -81,6 +97,12 @@ public class MessageDispatcher {
public void sendPeer(PacketBaseDialog packet, Client client, boolean bufferizationNeed) throws ProtocolException { public void sendPeer(PacketBaseDialog packet, Client client, boolean bufferizationNeed) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey(); String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey(); String toPublicKey = packet.getToPublicKey();
/**
* Ретранслируем сообщение ВСЕМ авторизованным сессиям отправителя КРОМЕ текущей,
* чтобы синхронизировать отправленные сообщения
*/
this.clientManager.retranslate(client, packet);
this.clientManager.sendPacketToAuthorizedPK(toPublicKey, packet); this.clientManager.sendPacketToAuthorizedPK(toPublicKey, packet);
if(!bufferizationNeed){ if(!bufferizationNeed){
@@ -94,11 +116,6 @@ public class MessageDispatcher {
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя * Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
*/ */
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet); this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
/**
* Ретранслируем сообщение всем авторизованным сессиям отправителя, чтобы синхронизировать отправленные сообщения
*/
this.retranslate(packet, client);
} }
/** /**
@@ -114,34 +131,4 @@ public class MessageDispatcher {
this.sendPeer(packet, client, true); this.sendPeer(packet, client, true);
} }
/**
* Сообщает всем авторизованным сессиям отправителя о том, что он отправил сообщения,
* для того чтобы синхронизировать отправленные сообщения на всех устройствах отправителя
* @param packet пакет сообщения
* @param client клиент отправляющий пакет
* @throws ProtocolException
*/
public void retranslate(PacketBaseDialog packet, Client client) throws ProtocolException {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
HashSet<Client> clients = this.clientManager.getClientIndexer()
.getClients(ECIAuthentificate.class, "publicKey", eciAuthentificate.getPublicKey());
if(clients == null){
/**
* Нет авторизованных сессий с таким публичным ключом
*/
return;
}
for(Client c : clients){
/**
* Проходим по всем устройствам с таким публичным ключом и ретранслируем им пакет, кроме того устройства что
* отправило пакет
*/
if(c.equals(client)){
continue;
}
c.send(packet);
}
}
} }

View File

@@ -8,6 +8,7 @@ import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.database.QuerySession; import im.rosetta.database.QuerySession;
import im.rosetta.database.entity.Buffer; import im.rosetta.database.entity.Buffer;
import im.rosetta.database.repository.BufferRepository; import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository;
import im.rosetta.exception.UnauthorizedExeception; import im.rosetta.exception.UnauthorizedExeception;
import im.rosetta.packet.Packet7Read; import im.rosetta.packet.Packet7Read;
import im.rosetta.service.Service; import im.rosetta.service.Service;
@@ -20,6 +21,7 @@ import io.orprotocol.packet.PacketManager;
public class BufferService extends Service<BufferRepository> { public class BufferService extends Service<BufferRepository> {
private PacketManager packetManager; private PacketManager packetManager;
private GroupRepository groupRepository = new GroupRepository();
public BufferService(BufferRepository repository, PacketManager packetManager) { public BufferService(BufferRepository repository, PacketManager packetManager) {
super(repository); super(repository);
@@ -41,10 +43,24 @@ public class BufferService extends Service<BufferRepository> {
*/ */
throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer"); throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer");
} }
/**
* Получаем группы клиента, и исходя из этого формируем список каких пакетов нам нужно взять из базы
*/
List<String> clientGroups = this.groupRepository.findGroupsByMember(eciAuthentificate.getPublicKey());
List<String> toValue = new ArrayList<>();
/**
* Добавляем публичный ключ клиента, так как ему нужны пакеты, которые были отправлены ему напрямую, а не в группу
*/
toValue.add(eciAuthentificate.getPublicKey());
/**
* Добавляем группы клиента, так как ему нужны пакеты, которые были отправлены в эти группы
*/
toValue.addAll(clientGroups);
String toPublicKey = eciAuthentificate.getPublicKey(); String toPublicKey = eciAuthentificate.getPublicKey();
String hql = "FROM Buffer WHERE (to = :to OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC"; String hql = "FROM Buffer WHERE (to IN (:to) OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC";
HashMap<String, Object> parameters = new HashMap<>(); HashMap<String, Object> parameters = new HashMap<>();
parameters.put("to", toPublicKey); parameters.put("to", toValue);
parameters.put("from", toPublicKey); parameters.put("from", toPublicKey);
parameters.put("timestamp", fromTimestampMs); parameters.put("timestamp", fromTimestampMs);
List<Packet> packets = new ArrayList<>(); List<Packet> packets = new ArrayList<>();