Refractor Locker -> fix one concurent exception on HashMap + clean the code

This commit is contained in:
jeffcheasey88 2025-01-31 11:38:41 +01:00
parent a79876b71c
commit 012978d759
2 changed files with 38 additions and 47 deletions

View file

@ -29,20 +29,20 @@ public class Client<U extends User> extends Thread{
writer.flush(); writer.flush();
writer.close(); writer.close();
}catch(AuthException e){ }catch(AuthException e){
this.router.getExceptionLogger().setValue(e); this.router.getExceptionLogger().pushValue(e);
}catch(Throwable e){ }catch(Throwable e){
this.router.getExceptionLogger().setValue(e); this.router.getExceptionLogger().pushValue(e);
if(context != null && context.getResponseCode() == 0){ if(context != null && context.getResponseCode() == 0){
try{ try{
context.response(500); context.response(500);
writer.flush(); writer.flush();
writer.close(); writer.close();
}catch(Exception ex){ }catch(Exception ex){
this.router.getExceptionLogger().setValue(ex); this.router.getExceptionLogger().pushValue(ex);
} }
} }
} }
if(context != null) router.getLogger().setValue(context); if(context != null) router.getLogger().pushValue(context);
} }
private User isLogin(RequestType type, HttpReader reader) throws AuthException{ private User isLogin(RequestType type, HttpReader reader) throws AuthException{
@ -54,7 +54,7 @@ public class Client<U extends User> extends Thread{
writer.flush(); writer.flush();
writer.close(); writer.close();
}catch(Exception ex){ }catch(Exception ex){
this.router.getExceptionLogger().setValue(ex); this.router.getExceptionLogger().pushValue(ex);
} }
throw new AuthException(e); throw new AuthException(e);
} }

View file

@ -1,9 +1,8 @@
package dev.peerat.framework; package dev.peerat.framework;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -13,7 +12,7 @@ public class Locker<V>{
private Map<Key, BlockingQueue<V>> map; private Map<Key, BlockingQueue<V>> map;
public Locker(){ public Locker(){
this.map = new HashMap<>(); this.map = new ConcurrentHashMap<>();
} }
public void init(Key key){ public void init(Key key){
@ -28,21 +27,20 @@ public class Locker<V>{
return this.map.get(key); return this.map.get(key);
} }
public void setValue(V value){ public void pushValue(V value){
for(Entry<Key, BlockingQueue<V>> entry : this.map.entrySet()){ for(BlockingQueue<V> queue : this.map.values()){
entry.getValue().add(value); queue.add(value);
this.unlock(entry.getKey()); unlock(queue);
} }
} }
public V getValue(Key key){ public V getValue(Key key) throws InterruptedException{
BlockingQueue<V> queue = get(key); BlockingQueue<V> queue = get(key);
if(queue.isEmpty()) return null; lock(queue);
return queue.poll(); return queue.poll();
} }
public void lock(Key key) throws InterruptedException{ private void lock(BlockingQueue<V> queue) throws InterruptedException{
BlockingQueue<V> queue = get(key);
if(queue.isEmpty()){ if(queue.isEmpty()){
synchronized(queue){ synchronized(queue){
queue.wait(); queue.wait();
@ -50,8 +48,7 @@ public class Locker<V>{
} }
} }
public void unlock(Key key){ private void unlock(BlockingQueue<V> queue){
BlockingQueue<V> queue = get(key);
synchronized(queue){ synchronized(queue){
queue.notify(); queue.notify();
} }
@ -61,46 +58,40 @@ public class Locker<V>{
Key key = new Key(); Key key = new Key();
init(key); init(key);
try { try {
while(true){ while(true) action.accept(getValue(key));
lock(key);
V value = getValue(key);
action.accept(value);
}
}catch(Exception e){} }catch(Exception e){}
remove(key); remove(key);
} }
public void listen(Consumer<V> action, Consumer<Exception> onClose){
Key key = new Key();
init(key);
try {
while(true) action.accept(getValue(key));
}catch(Exception e){
onClose.accept(e);
}
remove(key);
}
public void listen(Supplier<Boolean> condition, Consumer<V> action){ public void listen(Supplier<Boolean> condition, Consumer<V> action){
Key key = new Key(); Key key = new Key();
init(key); init(key);
try { try {
while(condition.get()){ while(condition.get()) action.accept(getValue(key));
lock(key);
V value = getValue(key);
action.accept(value);
}
}catch(Exception e){} }catch(Exception e){}
remove(key); remove(key);
} }
public Thread listenAsync(Consumer<V> action){ public void listen(Supplier<Boolean> condition, Consumer<V> action, Consumer<Exception> onClose){
Thread thread = new Thread(new Runnable(){ Key key = new Key();
public void run(){ init(key);
listen(action); try {
while(condition.get()) action.accept(getValue(key));
}catch(Exception e){
onClose.accept(e);
} }
}); remove(key);
thread.start();
return thread;
}
public Thread listenAsync(Supplier<Boolean> condition,Consumer<V> action){
Thread thread = new Thread(new Runnable(){
public void run(){
listen(condition, action);
}
});
thread.start();
return thread;
} }
public static class Key{ public Key(){} } public static class Key{ public Key(){} }