Исправление зависших сессий на сервере при преждевременном RTC close

This commit is contained in:
RoyceDa
2026-04-07 14:43:32 +02:00
parent faaffd86d0
commit bc0a64f450
5 changed files with 57 additions and 18 deletions

View File

@@ -116,7 +116,7 @@ public class Boot {
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ? int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7; Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger); this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager); this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager, this.callManager);
} }
/** /**

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit;
import im.rosetta.client.ClientManager; import im.rosetta.client.ClientManager;
import im.rosetta.packet.Packet26SignalPeer; import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.runtime.NetworkSignalType; import im.rosetta.packet.runtime.NetworkSignalType;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException; import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client; import io.orprotocol.client.Client;
import io.orprotocol.packet.Packet; import io.orprotocol.packet.Packet;
@@ -17,13 +18,13 @@ public class CallManager {
private List<CallSession> callSessions = new ArrayList<>(); private List<CallSession> callSessions = new ArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private static final long RINGING_TIMEOUT = 10 * 1000; // 1 минута private static final long RINGING_TIMEOUT = 30 * 1000;
private ClientManager clientManager; private ClientManager clientManager;
public CallManager(ClientManager clientManager) { public CallManager(ClientManager clientManager) {
this.clientManager = clientManager; this.clientManager = clientManager;
scheduler.scheduleAtFixedRate(this::cleanupCallSessions, 0, 30, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::cleanupCallSessions, 0, 1, TimeUnit.SECONDS);
} }
public void cleanupCallSessions() { public void cleanupCallSessions() {
@@ -75,6 +76,15 @@ public class CallManager {
return null; return null;
} }
public CallSession getCallSession(Room room) {
for (CallSession session : this.callSessions) {
if (session.getRoom() != null && session.getRoom().equals(room)) {
return session;
}
}
return null;
}
public boolean isBusy(String publicKey) { public boolean isBusy(String publicKey) {
for (CallSession session : this.callSessions) { for (CallSession session : this.callSessions) {
if (session.clients.containsKey(publicKey)) { if (session.clients.containsKey(publicKey)) {

View File

@@ -70,6 +70,15 @@ public class CallSession {
return this.joinToken; return this.joinToken;
} }
public void leaveCall(String publicKey) {
if(this.clients.containsKey(publicKey)) {
this.clients.remove(publicKey);
}
if(this.ringing.containsKey(publicKey)) {
this.ringing.remove(publicKey);
}
}
public void addRinging(String publicKey) { public void addRinging(String publicKey) {
this.ringing.put(publicKey, System.currentTimeMillis()); this.ringing.put(publicKey, System.currentTimeMillis());
} }

View File

@@ -7,6 +7,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import im.rosetta.calls.CallManager;
import im.rosetta.calls.CallSession;
import im.rosetta.client.ClientManager; import im.rosetta.client.ClientManager;
import im.rosetta.logger.Logger; import im.rosetta.logger.Logger;
import im.rosetta.logger.enums.Color; import im.rosetta.logger.enums.Color;
@@ -33,10 +35,12 @@ public class ForwardUnitService {
private Set<SFU> sfuConnections = ConcurrentHashMap.newKeySet(); private Set<SFU> sfuConnections = ConcurrentHashMap.newKeySet();
private ClientManager clientManager; private ClientManager clientManager;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private CallManager callManager;
public ForwardUnitService(Logger logger, ClientManager clientManager) { public ForwardUnitService(Logger logger, ClientManager clientManager, CallManager callManager) {
this.logger = logger; this.logger = logger;
this.clientManager = clientManager; this.clientManager = clientManager;
this.callManager = callManager;
this.sfuConnectionsSheduler(); this.sfuConnectionsSheduler();
} }
@@ -135,25 +139,33 @@ public class ForwardUnitService {
public void onPeerDisconnected(DisconnectedPeer disconnectedPeer) throws ProtocolException { public void onPeerDisconnected(DisconnectedPeer disconnectedPeer) throws ProtocolException {
Room room = disconnectedPeer.getRoom(); Room room = disconnectedPeer.getRoom();
if(disconnectedPeer.getReason() != DisconnectReason.FAILED){ CallSession callSession = this.callManager.getCallSession(room);
callSession.leaveCall(disconnectedPeer.getPeerId());
if(disconnectedPeer.getReason() == DisconnectReason.FAILED){
/** /**
* Если у нас произошло штатное отключение, а не в результате обрыва связи - то не нужно отправлять * Произошло нештатное отключение клиента от сервера SFU, например, из-за сбоя сети
* оппонентам пакеты о том, что участник отключился в результате обрыва связи.
*/ */
return; if(callSession.shouldRemove()){
}
for(String peerId : room.getParticipants()) {
/**
* Уведомляем все пиры, что соединение с пиром было потеряно
*/
if(room.getParticipants().size() == 1) {
/**
* Звонок был завершен, так как в комнате остался только один участник, который не может продолжать звонок в одиночку.
*/
Packet26SignalPeer packet = new Packet26SignalPeer(); Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED); packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED);
this.clientManager.sendPacketToAuthorizedPK(peerId, packet); callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
} }
return;
}
if(disconnectedPeer.getReason() == DisconnectReason.CLOSED){
/**
* Клиент намеренно покинул звонок, например, отключился от SFU сервера, так как завершил звонок или вышел из комнаты
* (например если клиент отрабатывает выход из звонка по кнопке END не правильно)
*/
if(callSession.shouldRemove()){
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL);
callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
}
return;
} }
} }

View File

@@ -157,4 +157,12 @@ public class Room {
buffer.flip(); buffer.flip();
this.sfu.getConnection().send(buffer); this.sfu.getConnection().send(buffer);
} }
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Room room = (Room) obj;
return roomId.equals(room.roomId);
}
} }