Compare commits

...

6 Commits

7 changed files with 142 additions and 35 deletions

View File

@@ -116,7 +116,7 @@ public class Boot {
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager);
this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager, this.callManager);
}
/**

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit;
import im.rosetta.client.ClientManager;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.runtime.NetworkSignalType;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.Packet;
@@ -17,13 +18,13 @@ public class CallManager {
private List<CallSession> callSessions = new ArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private static final long RINGING_TIMEOUT = 10 * 1000; // 1 минута
private static final long RINGING_TIMEOUT = 30 * 1000;
private ClientManager clientManager;
public CallManager(ClientManager clientManager) {
this.clientManager = clientManager;
scheduler.scheduleAtFixedRate(this::cleanupCallSessions, 0, 30, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(this::cleanupCallSessions, 0, 1, TimeUnit.SECONDS);
}
public void cleanupCallSessions() {
@@ -75,6 +76,15 @@ public class CallManager {
return null;
}
public CallSession getCallSession(Room room) {
for (CallSession session : this.callSessions) {
if (session.getRoom() != null && session.getRoom().equals(room)) {
return session;
}
}
return null;
}
public boolean isBusy(String publicKey) {
for (CallSession session : this.callSessions) {
if (session.clients.containsKey(publicKey)) {

View File

@@ -18,7 +18,9 @@ public class CallSession {
* Клиенты в этом списке не могут принимать другие звонки, так как они уже заняты дозвоном,
* но они еще не в звонке, так как не приняли звонок
*
* Клиенты удаляются из этого списка, когда они принимают звонок или отклоняют его, тогда они либо переходят в звонок, либо становятся свободными для других звонков
* Клиенты удаляются из этого списка, когда они принимают звонок или отклоняют его,
* тогда они либо переходят в звонок, либо становятся свободными для других звонков
* pk -> время начала дозвона (timestamp в миллисекундах)
*/
public HashMap<String, Long> ringing;
/**
@@ -48,6 +50,16 @@ public class CallSession {
this.clients.put(publicKey, client);
}
/**
* Проверяет, может ли этот публичный ключ выполнять какие-либо действия в рамках этой сессии звонка,
* чтобы не допустить выполнение действий от посторонних публичных ключей, которые не участвуют в звонке
* @param publicKey Публичный ключ для проверки
* @return true, если этот публичный ключ может выполнять действия в рамках этой сессии звонка, false иначе
*/
public boolean isValidSource(String publicKey) {
return this.ringing.containsKey(publicKey) || this.clients.containsKey(publicKey);
}
/**
* Получаем публичный ключ клиента по его сокету, чтобы понимать, кто отправляет сигналы в рамках звонка
* @param client Сокет клиента, для которого нужно получить публичный ключ
@@ -70,6 +82,15 @@ public class CallSession {
return this.joinToken;
}
public void leaveCall(String publicKey) {
if(this.clients.containsKey(publicKey)) {
this.clients.remove(publicKey);
}
if(this.ringing.containsKey(publicKey)) {
this.ringing.remove(publicKey);
}
}
public void addRinging(String publicKey) {
this.ringing.put(publicKey, System.currentTimeMillis());
}

View File

@@ -56,9 +56,10 @@ public class ClientManager {
* Отправить пакет ВСЕМ АВТОРИЗОВАННЫМ клиентам с публичным ключом publicKey
* @param publicKey публичный ключ получателя
* @param packet пакет для отправки
* @param exclude клиент, который не должен получать этот пакет, может быть null
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToAuthorizedPK(String publicKey, Packet packet) throws ProtocolException {
public void sendPacketToAuthorizedPK(String publicKey, Packet packet, Client exclude) throws ProtocolException {
Set<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
if(clients == null){
/**
@@ -77,12 +78,55 @@ public class ClientManager {
continue;
}
/**
* Отправляем пакет каждому клиенту с таким публичным ключом (то есть всем его авторизованным сессиям/устройствам)
* Отправляем пакет каждому клиенту с таким публичным ключом (то есть всем его авторизованным сессиям/устройствам),
* исключая клиента exclude
*/
if(exclude != null && client.equals(exclude)){
/**
* Этот клиент является исключением, он не должен получать этот пакет
*/
continue;
}
client.send(packet);
}
}
/**
* Отправить пакет ВСЕМ АВТОРИЗОВАННЫМ клиентам с публичным ключом publicKey
* @param publicKey публичный ключ получателя
* @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToAuthorizedPK(String publicKey, Packet packet) throws ProtocolException {
this.sendPacketToAuthorizedPK(publicKey, packet, null);
}
/**
* Отправить пакет всем клиентам с публичными ключами из списка publicKeys
* @param publicKeys список публичных ключей получателей
* @param packet пакет для отправки
* @param exclude клиент, который не должен получать этот пакет, может быть null
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToAuthorizedPK(List<String> publicKeys, Packet packet, Client exclude) throws ProtocolException {
for(String publicKey : publicKeys){
this.sendPacketToAuthorizedPK(publicKey, packet, exclude);
}
}
/**
* Отправить пакет всем клиентам с публичными ключами из списка publicKeys
* @param publicKeys список публичных ключей получателей
* @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToAuthorizedPK(List<String> publicKeys, Packet packet) throws ProtocolException {
for(String publicKey : publicKeys){
this.sendPacketToAuthorizedPK(publicKey, packet, null);
}
}
/**
* Отправить пакет всем клиентам с публичными ключом как у client, кроме клиента client, который является отправителем и не должен получать этот пакет
* @param client клиент
@@ -111,18 +155,6 @@ public class ClientManager {
}
}
/**
* Отправить пакет всем клиентам с публичными ключами из списка publicKeys
* @param publicKeys список публичных ключей получателей
* @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToAuthorizedPK(List<String> publicKeys, Packet packet) throws ProtocolException {
for(String publicKey : publicKeys){
this.sendPacketToAuthorizedPK(publicKey, packet);
}
}
/**
* Получить список клиентов по публичному ключу (get PublicKey clients), могут быть неавторизованные клиенты
* @param publicKey публичный ключ клиента

View File

@@ -127,6 +127,13 @@ public class Executor26SignalPeer extends PacketExecutor<Packet26SignalPeer> {
client.disconnect(Failures.NO_CALL_SESSION);
return;
}
if(!session.isValidSource(src)) {
/**
* Клиент не состоит в сессии звонка, отключаем его от сервера, так как он отправляет некорректные данные
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
Room room = this.fus.createRoom();
session.setRoom(room);
session.joinCall(src, client);
@@ -134,6 +141,13 @@ public class Executor26SignalPeer extends PacketExecutor<Packet26SignalPeer> {
room.addParticipant(participant);
}
session.sendPacket(packet, client);
/**
* Сбрасываем вызов на всех остальных устройствах пользователя, который принимает звонок,
* чтобы он не смог принять или отклонить звонок с другого устройства
*/
Packet26SignalPeer endCallOtherDevices = new Packet26SignalPeer();
endCallOtherDevices.setSignalType(NetworkSignalType.END_CALL);
this.clientManager.sendPacketToAuthorizedPK(src, endCallOtherDevices, client);
return;
}
if(type == NetworkSignalType.KEY_EXCHANGE){
@@ -170,12 +184,22 @@ public class Executor26SignalPeer extends PacketExecutor<Packet26SignalPeer> {
}
if(session == null) {
/**
* Сессии звонка нет
* Сессии звонка нет, скорее всего она была удалена при обрыве RTC Peer Connection,
* при срабатывании RTCPeerConnection::close на клиенте раньше, чем клиент отправил сигнал END_CALL
*/
client.disconnect(Failures.NO_CALL_SESSION);
return;
}
/**
* Отправляем сигнал окончания звонка всем участникам сессии, кроме отправителя
*/
session.sendPacket(packet, client);
/**
* Отправляем пакет вызываемым (ringing) пользователям (которые еще не в сессии)
*/
this.callManager.sendPacketToRinging(session, packet);
/**
* Удаляем сессию из активных сессий звонков
*/
this.callManager.removeSession(session);
return;
}

View File

@@ -7,6 +7,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import im.rosetta.calls.CallManager;
import im.rosetta.calls.CallSession;
import im.rosetta.client.ClientManager;
import im.rosetta.logger.Logger;
import im.rosetta.logger.enums.Color;
@@ -33,10 +35,12 @@ public class ForwardUnitService {
private Set<SFU> sfuConnections = ConcurrentHashMap.newKeySet();
private ClientManager clientManager;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private CallManager callManager;
public ForwardUnitService(Logger logger, ClientManager clientManager) {
public ForwardUnitService(Logger logger, ClientManager clientManager, CallManager callManager) {
this.logger = logger;
this.clientManager = clientManager;
this.callManager = callManager;
this.sfuConnectionsSheduler();
}
@@ -135,25 +139,33 @@ public class ForwardUnitService {
public void onPeerDisconnected(DisconnectedPeer disconnectedPeer) throws ProtocolException {
Room room = disconnectedPeer.getRoom();
if(disconnectedPeer.getReason() != DisconnectReason.FAILED){
CallSession callSession = this.callManager.getCallSession(room);
callSession.leaveCall(disconnectedPeer.getPeerId());
if(disconnectedPeer.getReason() == DisconnectReason.FAILED){
/**
* Если у нас произошло штатное отключение, а не в результате обрыва связи - то не нужно отправлять
* оппонентам пакеты о том, что участник отключился в результате обрыва связи.
*/
return;
}
for(String peerId : room.getParticipants()) {
/**
* Уведомляем все пиры, что соединение с пиром было потеряно
*/
if(room.getParticipants().size() == 1) {
/**
* Звонок был завершен, так как в комнате остался только один участник, который не может продолжать звонок в одиночку.
* Произошло нештатное отключение клиента от сервера SFU, например, из-за сбоя сети
*/
if(callSession.shouldRemove()){
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED);
this.clientManager.sendPacketToAuthorizedPK(peerId, packet);
callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
}
return;
}
if(disconnectedPeer.getReason() == DisconnectReason.CLOSED){
/**
* Клиент намеренно покинул звонок, например, отключился от SFU сервера, так как завершил звонок или вышел из комнаты
* (например если клиент отрабатывает выход из звонка по кнопке END не правильно)
*/
if(callSession.shouldRemove()){
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL);
callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
}
return;
}
}

View File

@@ -157,4 +157,12 @@ public class Room {
buffer.flip();
this.sfu.getConnection().send(buffer);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Room room = (Room) obj;
return roomId.equals(room.roomId);
}
}