24282 2 lat temu
rodzic
commit
a4063d510b

+ 1 - 2
src/main/java/com/fjhx/MyMain.java

@@ -24,7 +24,7 @@ public class MyMain {
 
 
     public static void main(String[] args) throws IOException {
     public static void main(String[] args) throws IOException {
 
 
-        System.out.println("Version: 2023-01-29 10:46");
+        System.out.println("Version: 2023-01-31 15:03");
 
 
         MyUtil.infoLog("程序启动中...");
         MyUtil.infoLog("程序启动中...");
 
 
@@ -35,7 +35,6 @@ public class MyMain {
 
 
         // 开启webSocket连接
         // 开启webSocket连接
         connectWebSocket();
         connectWebSocket();
-
     }
     }
 
 
     /**
     /**

+ 15 - 8
src/main/java/com/fjhx/service/impl/ServiceImpl.java

@@ -37,6 +37,7 @@ public class ServiceImpl implements Service {
 
 
         String forwardUserId = data.getString(WebSocketConstant.FORWARD_USER_ID);
         String forwardUserId = data.getString(WebSocketConstant.FORWARD_USER_ID);
         String forwardSessionId = data.getString(WebSocketConstant.FORWARD_SESSION_ID);
         String forwardSessionId = data.getString(WebSocketConstant.FORWARD_SESSION_ID);
+
         // 获取指令
         // 获取指令
         String instructions = data.getString("instructions");
         String instructions = data.getString("instructions");
         // 操作上位机类型
         // 操作上位机类型
@@ -78,9 +79,8 @@ public class ServiceImpl implements Service {
             }
             }
         }
         }
 
 
-        if (operationList.size() == 0) {
-            operationMap.remove(sessionId);
-        }
+        System.out.println("put operationList,size() = " + operationList.size());
+        operationMap.put(sessionId, operationList);
     }
     }
 
 
     /**
     /**
@@ -89,8 +89,14 @@ public class ServiceImpl implements Service {
      * @param sessionId 会话id
      * @param sessionId 会话id
      */
      */
     private void close(String sessionId) {
     private void close(String sessionId) {
+
         List<Operation> operationList = operationMap.get(sessionId);
         List<Operation> operationList = operationMap.get(sessionId);
+        if (operationList == null) {
+            MyUtil.errorLog("执行close方法,sessionId=" + sessionId + ",operationList == null");
+        }
+
         if (operationList != null) {
         if (operationList != null) {
+            MyUtil.errorLog("执行close方法,sessionId=" + sessionId + ",operationList.size()=" + operationList.size());
             operationList.forEach(Operation::close);
             operationList.forEach(Operation::close);
             operationMap.remove(sessionId);
             operationMap.remove(sessionId);
         }
         }
@@ -108,15 +114,16 @@ public class ServiceImpl implements Service {
     /**
     /**
      * 获取本次会话id的操作集合
      * 获取本次会话id的操作集合
      */
      */
-    private List<Operation> newOperationList(String sessionId) {
-        List<Operation> operationList = operationMap.computeIfAbsent(sessionId, k -> new ArrayList<>());
+    private synchronized List<Operation> newOperationList(String sessionId) {
+        List<Operation> operationList = operationMap.get(sessionId);
 
 
-        if (operationList.size() > 0) {
+        if (operationList != null && operationList.size() > 0) {
+            MyUtil.errorLog("newOperationList 有数据,size = " + operationList.size());
             operationList.forEach(Operation::close);
             operationList.forEach(Operation::close);
-            operationList.clear();
+            operationMap.remove(sessionId);
         }
         }
 
 
-        return operationList;
+        return new ArrayList<>();
     }
     }
 
 
 }
 }

+ 50 - 13
src/main/java/com/fjhx/service/impl/TcpOperation.java

@@ -6,7 +6,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.fjhx.entity.ConnectConfig;
 import com.fjhx.entity.ConnectConfig;
 import com.fjhx.service.Operation;
 import com.fjhx.service.Operation;
 import com.fjhx.utils.MyUtil;
 import com.fjhx.utils.MyUtil;
-import com.fjhx.utils.ThreadPoolManager;
 import com.rfid.callBack.CallBack;
 import com.rfid.callBack.CallBack;
 import com.rfid.uhf.controller.impl.ReaderR2000;
 import com.rfid.uhf.controller.impl.ReaderR2000;
 import com.rfid.uhf.service.ReaderR2000Service;
 import com.rfid.uhf.service.ReaderR2000Service;
@@ -21,35 +20,73 @@ public class TcpOperation implements Operation {
     private final ReaderR2000Service service = new ReaderR2000ServiceImpl();
     private final ReaderR2000Service service = new ReaderR2000ServiceImpl();
     private final List<ReaderR2000> readerR2000List = new ArrayList<>();
     private final List<ReaderR2000> readerR2000List = new ArrayList<>();
     private final Set<String> rfidSet = new ConcurrentHashSet<>();
     private final Set<String> rfidSet = new ConcurrentHashSet<>();
+    private final List<Thread> threadList = new ArrayList<>();
 
 
     @Override
     @Override
     public void distinctRead(String userId, String sessionId, List<ConnectConfig> connectConfigList, JSONObject data) {
     public void distinctRead(String userId, String sessionId, List<ConnectConfig> connectConfigList, JSONObject data) {
 
 
         for (ConnectConfig connectConfig : connectConfigList) {
         for (ConnectConfig connectConfig : connectConfigList) {
-            ThreadPoolManager.execute(() -> {
-
-                ReaderR2000 reader = service.connect(
-                        connectConfig.getIp(),
-                        connectConfig.getPort(),
-                        getCallBack(userId, sessionId, data)
-                );
 
 
+            Thread thread = new Thread(() -> {
+                ReaderR2000 reader = service.connect(connectConfig.getIp(), connectConfig.getPort(), getCallBack(userId, sessionId, data));
                 service.beginInvV2(reader);
                 service.beginInvV2(reader);
                 readerR2000List.add(reader);
                 readerR2000List.add(reader);
-
             });
             });
+            thread.start();
+            threadList.add(thread);
         }
         }
 
 
     }
     }
 
 
     @Override
     @Override
     public void close() {
     public void close() {
+        MyUtil.errorLog("执行close方法,readerR2000List.size() = " + readerR2000List.size());
+
         for (ReaderR2000 readerR2000 : readerR2000List) {
         for (ReaderR2000 readerR2000 : readerR2000List) {
-            // 结束扫描
-            service.stopInvV2(readerR2000);
-            // 关闭连接
-            service.disconnect(readerR2000);
+
+            try {
+                // 结束扫描
+                service.stopInvV2(readerR2000);
+                // 关闭连接
+                service.disconnect(readerR2000);
+            } catch (Exception e) {
+
+            }
+
+            // boolean stopInvV2Flag;
+            // boolean disconnectFlag;
+            // do {
+            // try {
+            //     // 结束扫描
+            //     stopInvV2Flag = service.stopInvV2(readerR2000);
+            //     // 关闭连接
+            //     disconnectFlag = service.disconnect(readerR2000);
+            // } catch (Exception e) {
+            //     MyUtil.errorLog("关闭程序出现异常");
+            //
+            //     stopInvV2Flag = false;
+            //     disconnectFlag = false;
+            // }
+            //
+            // if (stopInvV2Flag && disconnectFlag) {
+            //     MyUtil.errorLog("关闭程序");
+            // } else {
+            //     MyUtil.errorLog("关闭程序失败");
+            // }
+
+            // } while (!stopInvV2Flag || !disconnectFlag);
+
+        }
+
+        for (Thread thread : threadList) {
+            try {
+                thread.stop();
+            } catch (Exception e) {
+                MyUtil.errorLog("关闭线程失败");
+                e.printStackTrace();
+            }
         }
         }
+
     }
     }
 
 
     private CallBack.R2000 getCallBack(String userId, String sessionId, JSONObject data) {
     private CallBack.R2000 getCallBack(String userId, String sessionId, JSONObject data) {

+ 2 - 3
src/main/java/com/fjhx/service/impl/UdpOperation.java

@@ -7,7 +7,6 @@ import com.fjhx.constant.UdpConstant;
 import com.fjhx.entity.ConnectConfig;
 import com.fjhx.entity.ConnectConfig;
 import com.fjhx.service.Operation;
 import com.fjhx.service.Operation;
 import com.fjhx.utils.MyUtil;
 import com.fjhx.utils.MyUtil;
-import com.fjhx.utils.ThreadPoolManager;
 import com.fjhx.utils.Utility;
 import com.fjhx.utils.Utility;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 
 
@@ -43,13 +42,13 @@ public class UdpOperation implements Operation {
         this.connectConfigList = connectConfigList;
         this.connectConfigList = connectConfigList;
 
 
         for (ConnectConfig connectConfig : connectConfigList) {
         for (ConnectConfig connectConfig : connectConfigList) {
-            ThreadPoolManager.execute(() -> {
 
 
+            new Thread(() -> {
                 // 开启设备读取rfid
                 // 开启设备读取rfid
                 DatagramSocket datagramSocket = send(UdpConstant.START_READ_INSTRUCTION, connectConfig);
                 DatagramSocket datagramSocket = send(UdpConstant.START_READ_INSTRUCTION, connectConfig);
                 startMonitor(datagramSocket, userId, sessionId, data);
                 startMonitor(datagramSocket, userId, sessionId, data);
+            }).start();
 
 
-            });
         }
         }
 
 
     }
     }

+ 0 - 69
src/main/java/com/fjhx/utils/ThreadPoolManager.java

@@ -1,69 +0,0 @@
-package com.fjhx.utils;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * 线程池
- */
-public class ThreadPoolManager {
-
-    //通过ThreadPoolExecutor的代理类来对线程池的管理
-    private static ThreadPollProxy mThreadPollProxy;
-
-    //单列对象
-    private static ThreadPollProxy getThreadPollProxy() {
-        synchronized (ThreadPollProxy.class) {
-            if (mThreadPollProxy == null) {
-                mThreadPollProxy = new ThreadPollProxy(10, 20, 1000);
-            }
-        }
-        return mThreadPollProxy;
-    }
-
-    /**
-     * 提交线程
-     *
-     * @param r
-     */
-    public static void execute(Runnable r) {
-        getThreadPollProxy().execute(r);
-    }
-
-    //通过ThreadPoolExecutor的代理类来对线程池的管理
-    private static class ThreadPollProxy {
-        private ThreadPoolExecutor poolExecutor;//线程池执行者 ,java内部通过该api实现对线程池管理
-        private final int corePoolSize;
-        private final int maximumPoolSize;
-        private final long keepAliveTime;
-
-        public ThreadPollProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
-            this.corePoolSize = corePoolSize;
-            this.maximumPoolSize = maximumPoolSize;
-            this.keepAliveTime = keepAliveTime;
-        }
-
-        //对外提供一个执行任务的方法
-        public void execute(Runnable r) {
-            if (poolExecutor == null || poolExecutor.isShutdown()) {
-                poolExecutor = new ThreadPoolExecutor(
-                        //核心线程数量
-                        corePoolSize,
-                        //最大线程数量
-                        maximumPoolSize,
-                        //当线程空闲时,保持活跃的时间
-                        keepAliveTime,
-                        //时间单元 ,毫秒级
-                        TimeUnit.MILLISECONDS,
-                        //线程任务队列
-                        new LinkedBlockingQueue<>(),
-                        //创建线程的工厂
-                        Executors.defaultThreadFactory());
-            }
-            poolExecutor.execute(r);
-        }
-    }
-
-}