没长正的技术专栏 勤动手、多思考

初识高并发

2018-08-01

阅读:


初始多线程及原理

  1. 多线程的发展历史
  2. 线程的应用
  3. 并发编程的基础
  4. 线程安全问题

1.什么情况下应该使用多线程

​ 线程出现的目的是什么?解决进程中多任务的实时性问题?其实简单来说,也 就是解决“阻塞”的问题,阻塞的意思就是程序运行到某个函数或过程后等待某些事件发生而暂时停止 CPU 占用的情况,也就是说会使得 CPU 闲置。还有一些场景就是比如对于一个函数中的运算逻辑的性能问题,我们可以通过多线程的技术,使得一个函数中的多个逻辑运算通过多线程技术达到一个并行执行,从而提升性能所以,多线程最终解决的就是“等待”的问题,所以简单总结的使用场景。

  • 通过并行计算提高程序执行性能
  • 需要等待网络、I/O响应导致耗费大量的执行时间,可以采用异步线程的方 式来减少阻塞

1.1 tomcat7 以前的 io 模型(应用场景)

  • 客户端阻塞 如果客户端只有一个线程,这个线程发起读取文件的操作必须等待 IO 流返回,线程(客户端)才能做其他的事
  • 线程级别阻塞 BIO 客户端一个线程情况下,一个线程导致整个客户端阻塞。 那么我们可以使用多线程,一部分线程在等待 IO 操作返回其他线程可以继续做其他的事。此时从客户端角度来说,客户端没有闲着。

1.2 项目应用场景

​ 并行是利用多核,并发是利用CPU时间分片

  • 并行
    • Java 8 的 Stream 流式处理 parallelStream
    • 多线程:利用阻塞队列和多线程,异步处理提高性能。(线下采集:文件上传、解析、入库) csv 格式 与 load方式入库
  • 并发
    • 多线程:与上面相同。(销售数据管理平台、回调机制)

    • 接口聚合

      @Slf4j
      @Component
      public class MyFutureTask {
          @Resource
          UserService userService;
          /**
           * 核心线程 8 最大线程 20 保活时间30s 存储队列 10 有守护线程 
           * 拒绝策略:将超负荷任务回退到调用者
           */
          private static ExecutorService executor = new ThreadPoolExecutor(8, 20,
                  30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
                  new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(),
                  new ThreadPoolExecutor.CallerRunsPolicy());
          
          @SuppressWarnings("all")
          public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {
          
              System.out.println("MyFutureTask的线程:" + Thread.currentThread());
          
              long fansCount = 0, msgCount = 0, collectCount = 0,
                      followCount = 0, redBagCount = 0, couponCount = 0;
          
      //        fansCount = userService.countFansCountByUserId(userId);
      //        msgCount = userService.countMsgCountByUserId(userId);
      //        collectCount = userService.countCollectCountByUserId(userId);
      //        followCount = userService.countFollowCountByUserId(userId);
      //        redBagCount = userService.countRedBagCountByUserId(userId);
      //        couponCount = userService.countCouponCountByUserId(userId);
          
              try {
                  Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
                  Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
                  Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
                  Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
                  Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
                  Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));
          
                  //get阻塞
                  fansCount = fansCountFT.get();
                  msgCount = msgCountFT.get();
                  collectCount = collectCountFT.get();
                  followCount = followCountFT.get();
                  redBagCount = redBagCountFT.get();
                  couponCount = couponCountFT.get();
          
              } catch (InterruptedException | ExecutionException e) {
                  e.printStackTrace();
                  log.error(">>>>>>聚合查询用户聚合信息异常:" + e + "<<<<<<<<<");
              }
              UserBehaviorDataDTO userBehaviorData =
                      UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
                              .collectCount(collectCount).followCount(followCount)
                              .redBagCount(redBagCount).couponCount(couponCount).build();
              return userBehaviorData;
          }
      }
          
      //串行:50多s
      //异步并行:10s
      

2.如何应用线程

在 Java 中,有多种方式来实现多线程。继承 Thread 类、实现 Runnable 接 口、使用 ExecutorService、Callable、Future 实现带返回结果的多线程。

2.1 继承 Thread 类创建线程

Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。 启动线程的唯一方法就是通过 Thread 类的 start()实例方法。 start()方法是一个 native 方法,它会启动一个新线程,并执行 run()方法,没有返回值。 这种方式实现多线程很 简单,通过自己的类直接 extend Thread,并复写 run()方法,就可以启动新线 程并执行自己定义的 run()方法。

public class MyThread extends Thread {    
        public void run() {
            System.out.println("MyThread.run()");
        }
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
//join():等待myThread1执行完成,才执行下面的操作
myThread1.join();

myThread2.start();
//join(long)  等待指定时间后,不管是否完成都会进行下一步操作。
myThread2.join(100);
System.out.println("over");

2.2 实现 Runnable 接口创建线程

如果自己的类已经 extends 另一个类,就无法直接 extends Thread,此时, 可以实现一个 Runnable 接口,没有返回值

public class MyThread extends OtherClass implements Runnable {
    public void run() {
        System.out.println("MyThread.run()");
    }
}

2.3 实现 Callable 接口通过 FutureTask 包装器来创建 Thread 线程

有的时候,我们可能需要让一步执行的线程在执行完成以后,提供一个返回值 给到当前的主线程,主线程需要依赖这个值进行后续的逻辑处理,那么这个时 候,就需要用到带返回值的线程了。Java 中提供了这样的实现方式 有返回值

public class CallableDemo implements Callable<String> {
    
    public static void main(String[] args) throws ExecutionException,
    InterruptedException {
        ExecutorService executorService=
        Executors.newFixedThreadPool(1);
        CallableDemo callableDemo=new CallableDemo();
        Future<String> future=executorService.submit(callableDemo);
        //future.get() 主线程同步阻塞,等待完成
        System.out.println(future.get());
        executorService.shutdown();
    }
    
    @Override
    public String call() throws Exception {
        int a=1;
        int b=2;
        System.out.println(a+b);
        return "执行结果:"+(a+b);
    }
}
 

2.4CompletableFuture 实现

参考文档: https://blog.csdn.net/finalheart/article/details/87615546

https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650

Future使用Future获取返回结果,1. 通过get阻塞获取 2. 通过轮询方式,判断isDone是否为true;会阻塞主线程。

CompletableFuture:针对Future做了优化,可传入回调对象,当异步任务完成或异常时,调用回调对象的回调方法。

常用的模式:Supplier(生产者 get)、 Customer (消费者 accept) 、Function(方法 T, R)、 Predicate(断言)

2.5 如何把多线程用的更加优雅

​ 合理的利用异步操作,可以大大提升程序的处理性能,下面这个案例, 如果看过 zookeeper 源码的同学应该都见过,通过阻塞队列以及多线程的方式,实现对请求的异步化处理,提升处理性能。

2.6 为什么Thread线程启动不用run方法,而是用start方法?

  • run方法只是一个回调方法;
  • Thread 的实例化,需要调用操作系统的Thread方法才能生效;
  • 不同的操作系统Thread的实现方式不一样,线程的执行需要CPU去调度;
  • 核心: Thread.start() -> 各种初始化 os::start() -> CPU调度算法 -> run()回调

3. Java 并发编程的基础

线程作为操作系统调度的最小单元,并且能够让多线程同时执行,极大的提高 了程序的性能,在多核环境下的优势更加明显。但是在使用多线程的过程中, 如果对它的特性和原理不够理解的话,很容易造成各种问题。

2021-09-02_线程状态

来源

  • 线程的状态(参考JDK1.8 Thread类) * @since 1.5 Java 线程既然能够创建,那么也势必会被销毁,所以线程是存在生命周期的, 那么我们接下来从线程的生命周期开始去了解线程。 线程一共有 6 种状态(NEW、RUNNABLED、BLOCKED、WAITING、 TIME_WAITING、TERMINATED)

    常见方法:start、join、wait、notity、notifyAll、yield(很少用:现在都采用Lock实现该功能)、LockSupport.park、LockSupport.parkNanos、LockSupport.parkUnit、LockSupport.unpark(Thread)

    • NEW:初始状态,线程被构建,但是还没有调用 start 方法
    • RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一 称为“运行中”
    • BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃 了 CPU 使用权,阻塞也分为几种情况

      • 等待阻塞:运行的线程执行wait方法,jvm会把当前线程放入到等待队列
      • 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占 用了,那么 jvm 会把当前的线程放入到锁池中
      • 其他阻塞:运行的线程执行Thread.sleep或者t.join方法,或者发出了I/O 请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、 io 处理完毕则线程恢复
    • WAITING:线程等待状态
    • TIME_WAITING:超时等待状态,超时以后自动返回
    • TERMINATED:终止状态,表示当前线程执行完毕
  • 通过相应命令显示线程状态
    • 打开终端或者命令提示符,键入“jps”,(JDK1.5 提供的一个显示当前所有 java 进程 pid 的命令),可以获得相应进程的 pid
    • 根据上一步骤获得的 pid,继续输入 jstack pid(jstack 是 java 虚拟机自带的 一种堆栈跟踪工具。jstack 用于打印出给定的 java 进程 ID 或 core file 或远程 调试服务的 Java 堆栈信息)
  • 线程的停止

    ​ 线程的启动过程大家都非常熟悉,但是如何终止一个线程,我相信绝大部分人,在面试的时候被问到这个问题时,也会不知所措,不知道怎么回答。 ​ 记住,线程的终止,并不是简单的调用 stop 命令去。虽然 api 仍然可以调用, 但是和其他的线程控制方法如 suspend、resume 一样都是过期了的不建议使 用,就拿 stop 来说,stop 方法在结束一个线程时并不会保证线程的资源正常 释放,因此会导致程序可能出现一些不确定的状态。 要优雅的去中断一个线程,在线程中提供了一个 interrupt 方法

  • interrupt 方法 当其他线程通过调用当前线程的 interrupt 方法,表示向当前线程打个招呼, 告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己。 线程通过检查资深是否被中断来进行相应,可以通过 isInterrupted()来判断是 否被中断。 通过下面这个例子,来实现了线程终止的逻辑
public class InterruptDemo {
    private static int i;
    public static void main(String[] args) throws
    InterruptedException {
        Thread thread=new Thread(()->{
        while(!Thread.currentThread().isInterrupted()){
            i++;
        }
            System.out.println("Num:"+i);
        },"interruptDemo");
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        //线程中断
        thread.interrupt();
    }
}

这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源, 而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅

  • Thread.interrupted 上面的案例中,通过 interrupt,设置了一个标识告诉线程可以终止了,线程中 还提供了静态方法 Thread.interrupted()对设置中断标识的线程复位。比如在 上面的案例中,外面的线程调用 thread.interrupt 来设置中断标识,而在线程 里面,又通过 Thread.interrupted 把线程的标识又进行了复位
public class InterruptDemo {
public static void main(String[] args) throws
InterruptedException{
    Thread thread=new Thread(()->{
        while(true){
            boolean ii=Thread.currentThread().isInterrupted();
            if(ii){
                System.out.println("before:"+ii);
                Thread.interrupted();//对线程进行复位,中断标识为
                false
                System.out.println("after:"+Thread.currentThread()
                .isInterrupted());
            }
        }
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);//TIME_WAITING
    thread.interrupt();//设置中断标识,中断标识为 true
    }
}

  • 其他的线程复位 除了通过 Thread.interrupted 方法对线程中断标识进行复位以外,还有一种被动复位的场景,就是对抛出 InterruptedException 异常的方法,在 InterruptedException 抛出之前,JVM 会先把线程的中断标识位清除,然后才 会抛出 InterruptedException,这个时候如果调用 isInterrupted 方法,将会 返回 false
public class InterruptDemo {
    public static void main(String[] args) throws
    InterruptedException{
        Thread thread=new Thread(()->{
            while(true){
                try {
                Thread.sleep(10000);
                } catch (InterruptedException e) {
                //抛出该异常,会将复位标识设置为 false
                e.printStackTrace();
                }
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt();//设置复位标识为 true
        TimeUnit.SECONDS.sleep(1);
        System.out.println(thread.isInterrupted());//false
    }
}
 

有同学在问线程为什么要复位?首先我们来看看线程执行 interrupt 以后的源 码是做了什么?

thread.cpp

void Thread::interrupt(Thread* thread) {
    trace("interrupt", thread);
    debug_only(check_for_dangling_thread_pointer(thread);)
	os::interrupt(thread);
}

os_linux.cpp

void os::interrupt(Thread* thread) { assert(Thread::current() == thread ||
    Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");
    OSThread* osthread = thread->osthread();
    if (!osthread->interrupted()) { osthread->set_interrupted(true);
    // More than one thread can get here with the same value of
    osthread,
    // resulting in multiple notifications. We do, however, want the
    store
    // to interrupted() to be visible to other threads before we
    execute unpark().
    OrderAccess::fence();
    ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ;
    }
}

其实就是通过 unpark 去唤醒当前线程,并且设置一个标识位为 true。 并没有 所谓的中断线程的操作,所以实际上,线程复位可以用来实现多个线程之间的 通信。

  • 线程的停止方法之 2

public class VolatileDemo {
private volatile static boolean stop=false;
    public static void main(String[] args) throws
        InterruptedException {
        Thread thread=new Thread(()->{
            int i=0;
            while(!stop){
                i++;
            }
        });
        thread.start();
        System.out.println("begin start thread");
        Thread.sleep(1000);
        stop=true;
    }
}
 

参考:

GP-Mic

ConcurrentHashMap线程安全吗

论异步编程的正确姿势:十个接口的活现在只需要一个接口就能搞定!

廖雪峰 CompletableFuture


欢迎拍砖,多多交流,转载请注明出处:[没长正的技术专栏](http://blog.meizhangzheng.com) 如涉及侵权问题,请发送邮件到xsj34567@163.com,如情况属实本人将会尽快删除。


下一篇 分布式

Comments

Content