Files
rosetta-wss/src/main/java/im/rosetta/executors/Executor26SignalPeer.java
RoyceDa 145aaf8288
All checks were successful
Build rosetta-wss / build (push) Successful in 1m31s
Исправление кика с сервера при гонке с закрытием RTC на SFU
2026-04-07 15:24:45 +02:00

223 lines
11 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package im.rosetta.executors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import im.rosetta.Failures;
import im.rosetta.calls.CallManager;
import im.rosetta.calls.CallSession;
import im.rosetta.client.ClientManager;
import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.runtime.NetworkSignalType;
import im.rosetta.service.dispatch.push.PushNotifyDispatcher;
import im.rosetta.service.dispatch.runtime.PushType;
import im.rosetta.service.services.ForwardUnitService;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
/**
* Используется в Peer To Peer звонках, в групповых звонках другой сигналинг
*/
public class Executor26SignalPeer extends PacketExecutor<Packet26SignalPeer> {
private ClientManager clientManager;
private ForwardUnitService fus;
private PushNotifyDispatcher pushNotifyDispatcher = new PushNotifyDispatcher();
/**
* Сигналы, которые может выполнять только авторизованный пользователь,
* все сигналы, которые не входят в этот перечень, будут доступны для
* исполнения без авторизации.
*/
private Set<NetworkSignalType> authentificatedTypes = new HashSet<>(){{
add(NetworkSignalType.CALL);
}};
/**
* Менеджер звонков, который реализует весь необхоимый функционал для управления звонками,
* например проверку занятости пользователя, и тд
*/
private CallManager callManager;
public Executor26SignalPeer(ClientManager clientManager, ForwardUnitService fus, CallManager callManager) {
this.clientManager = clientManager;
this.fus = fus;
this.callManager = callManager;
}
@Override
public void onPacketReceived(Packet26SignalPeer packet, Client client) throws Exception, ProtocolException {
String src = packet.getSrc();
String dst = packet.getDst();
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
NetworkSignalType type = packet.getSignalType();
if ((eciAuthentificate == null || !eciAuthentificate.hasAuthorized())
&& this.authentificatedTypes.contains(type)) {
/**
* Если клиент не авторизован, то мы не будем обрабатывать его сигналы на инициализацию звонка и создание комнаты
* и просто отключим его от сервера.
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
if(src != null && !src.equals(eciAuthentificate.getPublicKey()) && authentificatedTypes.contains(type)) {
/**
* Если src в пакете не совпадает с авторизованным PK клиента, то это может означать, что клиент пытается
* отправить сигнал от другого пользователя, отключаем его от сервера.
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
if(type == NetworkSignalType.CALL) {
/**
* Инициируется звонок от src к dst, проверяем, что dst не занят другим звонком, если занят, то отправляем сигнал END_CALL_BECAUSE_BUSY обратно src
*/
if(this.callManager.isBusy(dst) || this.callManager.isBusy(src)) {
/**
* Получатель сигнала уже находится в другой комнате, значит он занят другим звонком, отправляем сигнал END_CALL_BECAUSE_BUSY обратно src
*/
Packet26SignalPeer responsePacket = new Packet26SignalPeer();
responsePacket.setSignalType(NetworkSignalType.END_CALL_BECAUSE_BUSY);
client.send(responsePacket);
return;
}
/**
* Генерируем CallID и JoinToken
*/
String callId = UUID.randomUUID().toString();
String joinToken = UUID.randomUUID().toString();
packet.setJoinToken(joinToken);
packet.setCallId(callId);
/**
* Создаем сессию звонка и добавляем в нее звонящего
*/
CallSession session = this.callManager.createCall(callId, joinToken);
session.joinCall(src, client);
/**
* Добавляем dst в ringing, чтобы пометить, что ему поступает звонок
*/
session.addRinging(dst);
/**
* Получатель сигнала не занят, отправляем ему пуш уведомление о входящем звонке и сигнал CALL для инициализации звонка
*/
pushNotifyDispatcher.sendPush(dst, new HashMap<>(){{
put("type", PushType.CALL);
put("dialog", src);
put("callId", callId);
put("joinToken", joinToken);
}});
/**
* Отправляем сигнал CALL всем авторизованным устройствам вызываемого абонента
*/
this.clientManager.sendPacketToAuthorizedPK(dst, packet);
return;
}
if(type == NetworkSignalType.ACCEPT){
String callId = packet.getCallId();
String joinToken = packet.getJoinToken();
CallSession session = this.callManager.getCallSession(callId, joinToken);
if(session == null) {
/**
* Сессии звонка нет
*/
client.disconnect(Failures.NO_CALL_SESSION);
return;
}
if(!session.isValidSource(src)) {
/**
* Клиент не состоит в сессии звонка, отключаем его от сервера, так как он отправляет некорректные данные
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
Room room = this.fus.createRoom();
session.setRoom(room);
session.joinCall(src, client);
for(String participant : session.clients.keySet()) {
room.addParticipant(participant);
}
session.sendPacket(packet, client);
/**
* Сбрасываем вызов на всех остальных устройствах пользователя, который принимает звонок,
* чтобы он не смог принять или отклонить звонок с другого устройства
*/
Packet26SignalPeer endCallOtherDevices = new Packet26SignalPeer();
endCallOtherDevices.setSignalType(NetworkSignalType.END_CALL);
this.clientManager.sendPacketToAuthorizedPK(src, endCallOtherDevices, client);
return;
}
if(type == NetworkSignalType.KEY_EXCHANGE){
/**
* Ретранслируем ключи в рамках сессии
*/
CallSession session = this.callManager.getCallSession(client);
if(session == null) {
/**
* Сессии звонка нет
*/
client.disconnect(Failures.NO_CALL_SESSION);
return;
}
/**
* Обмениваемся ключами в рамках сессии, ретранслируя их всем участникам сессии, кроме отправителя
*/
session.sendPacket(packet, client);
return;
}
if(type == NetworkSignalType.END_CALL) {
/**
* Ретранслируем сигнал окончания звонка всем участникам сессии, кроме отправителя, и удаляем сессию
*/
/**
* Сначала получаем сессию по сокету отправителя пакета, если не находим, то пробуем найти сессию по callId и joinToken из пакета, если не находим,
* то отключаем клиента от сервера, так как он отправляет некорректные данные
*/
CallSession session = this.callManager.getCallSession(client);
if(session == null) {
String callId = packet.getCallId();
String joinToken = packet.getJoinToken();
session = this.callManager.getCallSession(callId, joinToken);
}
if(session == null) {
/**
* Сессии звонка нет, скорее всего она была удалена при обрыве RTC Peer Connection,
* при срабатывании RTCPeerConnection::close на клиенте раньше, чем клиент отправил сигнал END_CALL
*/
return;
}
/**
* Отправляем сигнал окончания звонка всем участникам сессии, кроме отправителя
*/
session.sendPacket(packet, client);
/**
* Отправляем пакет вызываемым (ringing) пользователям (которые еще не в сессии)
*/
this.callManager.sendPacketToRinging(session, packet);
/**
* Удаляем сессию из активных сессий звонков
*/
this.callManager.removeSession(session);
return;
}
if(type == NetworkSignalType.ACTIVE) {
/**
* Клиент сообщил, что прошел стадию обмена ключами и звонок активен
*/
CallSession session = this.callManager.getCallSession(client);
if(session == null) {
/**
* Сессии звонка нет
*/
client.disconnect(Failures.NO_CALL_SESSION);
return;
}
session.sendPacket(packet, null);
}
}
}