首页 热点资讯 义务教育 高等教育 出国留学 考研考公

如何使用Spring开发和监控线程池服务

发布网友 发布时间:2022-04-23 14:45

我来回答

1个回答

热心网友 时间:2022-05-05 06:59

线程池对执行同步或异步的任务很重要。本文展示如何利用Spring开发并监控线程池服务。创建线程池的其他两种方法已讲解过。

使用技术

JDK 1.6.0_21
Spring 3.0.5
Maven 3.0.2

第1步:创建Maven工程

下面是一个maven工程。(可以使用Maven或IDE的插件创建)。

第2步:添加依赖库

将Spring的依赖添加到Maven的pom.xml文件中。

<!-- Spring 3 dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>

使用下面的插件创建可执行jar包。

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.otv.exe.Application</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

第3步:创建任务类

创建一个实现Runnable接口的新TestTask类。这个类表示要执行的任务。

package com.otv.task;

import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestTask implements Runnable {

private static Logger log = Logger.getLogger(TestTask.class);
String taskName;

public TestTask() {
}

public TestTask(String taskName) {
this.taskName = taskName;
}

public void run() {
try {
log.debug(this.taskName + " : is started.");
Thread.sleep(10000);
log.debug(this.taskName + " : is completed.");
} catch (InterruptedException e) {
log.error(this.taskName + " : is not completed!");
e.printStackTrace();
}
}

@Override
public String toString() {
return (getTaskName());
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}
}

第4步:创建TestRejectedExecutionHandler类

TestRejectedExecutionHandler类实现了RejectedExecutionHandler接口。如果没有空闲线程并且队列超出*,任务会被拒绝。这个类处理被拒绝的任务。

package com.otv.handler;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestRejectedExecutionHandler implements RejectedExecutionHandler {

private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);

public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
log.debug(runnable.toString() + " : has been rejected");
}
}

第5步:创建ITestThreadPoolExecutorService接口

创建ITestThreadPoolExecutorService接口。(译者注:这个接口的主要功能是通过设置的参数创建一个线程池)

package com.otv.srv;

import java.util.concurrent.ThreadPoolExecutor;

import com.otv.handler.TestRejectedExecutionHandler;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public interface ITestThreadPoolExecutorService {

public ThreadPoolExecutor createNewThreadPool();

public int getCorePoolSize();

public void setCorePoolSize(int corePoolSize);

public int getMaxPoolSize();

public void setMaxPoolSize(int maximumPoolSize);

public long getKeepAliveTime();

public void setKeepAliveTime(long keepAliveTime);

public int getQueueCapacity();

public void setQueueCapacity(int queueCapacity);

public TestRejectedExecutionHandler getTestRejectedExecutionHandler();

public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler);

}

第6步:创建TestThreadPoolExecutorService类

TestThreadPoolExecutorService类实现了ITestThreadPoolExecutorService接口(上一步创建的接口)。这个类可以创建一个新的线程池。

package com.otv.srv;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.otv.handler.TestRejectedExecutionHandler;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService {

private int corePoolSize;
private int maxPoolSize;
private long keepAliveTime;
private int queueCapacity;
TestRejectedExecutionHandler testRejectedExecutionHandler;

public ThreadPoolExecutor createNewThreadPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(),
TimeUnit.SECONDS,
new ArrayBlockingQueue(getQueueCapacity()),
getTestRejectedExecutionHandler());
return executor;
}

public int getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}

public int getMaxPoolSize() {
return maxPoolSize;
}

public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}

public long getKeepAliveTime() {
return keepAliveTime;
}

public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public int getQueueCapacity() {
return queueCapacity;
}

public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}

public TestRejectedExecutionHandler getTestRejectedExecutionHandler() {
return testRejectedExecutionHandler;
}

public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler) {
this.testRejectedExecutionHandler = testRejectedExecutionHandler;
}
}

第7步: 创建IThreadPoolMonitorService接口

创建IThreadPoolMonitorService接口

package com.otv.monitor.srv;

import java.util.concurrent.ThreadPoolExecutor;

public interface IThreadPoolMonitorService extends Runnable {

public void monitorThreadPool();

public ThreadPoolExecutor getExecutor();

public void setExecutor(ThreadPoolExecutor executor);
}

第8步:创建ThreadPoolMonitorService类

ThreadPoolMonitorService类实现了IThreadPoolMonitorService接口。这个类用来监控已创建的线程池。

package com.otv.monitor.srv;

import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class ThreadPoolMonitorService implements IThreadPoolMonitorService {

private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class);
ThreadPoolExecutor executor;
private long monitoringPeriod;

public void run() {
try {
while (true){
monitorThreadPool();
Thread.sleep(monitoringPeriod*1000);
}
} catch (Exception e) {
log.error(e.getMessage());
}
}

public void monitorThreadPool() {
StringBuffer strBuff = new StringBuffer();
strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize());
strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize());
strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize());
strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount());
strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount());
strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount());
strBuff.append(" - isTerminated : ").append(executor.isTerminated());

log.debug(strBuff.toString());
}

public ThreadPoolExecutor getExecutor() {
return executor;
}

public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

public long getMonitoringPeriod() {
return monitoringPeriod;
}

public void setMonitoringPeriod(long monitoringPeriod) {
this.monitoringPeriod = monitoringPeriod;
}
}

热心网友 时间:2022-05-05 06:59

线程池对执行同步或异步的任务很重要。本文展示如何利用Spring开发并监控线程池服务。创建线程池的其他两种方法已讲解过。

使用技术

JDK 1.6.0_21
Spring 3.0.5
Maven 3.0.2

第1步:创建Maven工程

下面是一个maven工程。(可以使用Maven或IDE的插件创建)。

第2步:添加依赖库

将Spring的依赖添加到Maven的pom.xml文件中。

<!-- Spring 3 dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>

使用下面的插件创建可执行jar包。

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.otv.exe.Application</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

第3步:创建任务类

创建一个实现Runnable接口的新TestTask类。这个类表示要执行的任务。

package com.otv.task;

import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestTask implements Runnable {

private static Logger log = Logger.getLogger(TestTask.class);
String taskName;

public TestTask() {
}

public TestTask(String taskName) {
this.taskName = taskName;
}

public void run() {
try {
log.debug(this.taskName + " : is started.");
Thread.sleep(10000);
log.debug(this.taskName + " : is completed.");
} catch (InterruptedException e) {
log.error(this.taskName + " : is not completed!");
e.printStackTrace();
}
}

@Override
public String toString() {
return (getTaskName());
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}
}

第4步:创建TestRejectedExecutionHandler类

TestRejectedExecutionHandler类实现了RejectedExecutionHandler接口。如果没有空闲线程并且队列超出*,任务会被拒绝。这个类处理被拒绝的任务。

package com.otv.handler;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestRejectedExecutionHandler implements RejectedExecutionHandler {

private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);

public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
log.debug(runnable.toString() + " : has been rejected");
}
}

第5步:创建ITestThreadPoolExecutorService接口

创建ITestThreadPoolExecutorService接口。(译者注:这个接口的主要功能是通过设置的参数创建一个线程池)

package com.otv.srv;

import java.util.concurrent.ThreadPoolExecutor;

import com.otv.handler.TestRejectedExecutionHandler;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public interface ITestThreadPoolExecutorService {

public ThreadPoolExecutor createNewThreadPool();

public int getCorePoolSize();

public void setCorePoolSize(int corePoolSize);

public int getMaxPoolSize();

public void setMaxPoolSize(int maximumPoolSize);

public long getKeepAliveTime();

public void setKeepAliveTime(long keepAliveTime);

public int getQueueCapacity();

public void setQueueCapacity(int queueCapacity);

public TestRejectedExecutionHandler getTestRejectedExecutionHandler();

public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler);

}

第6步:创建TestThreadPoolExecutorService类

TestThreadPoolExecutorService类实现了ITestThreadPoolExecutorService接口(上一步创建的接口)。这个类可以创建一个新的线程池。

package com.otv.srv;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.otv.handler.TestRejectedExecutionHandler;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService {

private int corePoolSize;
private int maxPoolSize;
private long keepAliveTime;
private int queueCapacity;
TestRejectedExecutionHandler testRejectedExecutionHandler;

public ThreadPoolExecutor createNewThreadPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(),
TimeUnit.SECONDS,
new ArrayBlockingQueue(getQueueCapacity()),
getTestRejectedExecutionHandler());
return executor;
}

public int getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}

public int getMaxPoolSize() {
return maxPoolSize;
}

public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}

public long getKeepAliveTime() {
return keepAliveTime;
}

public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public int getQueueCapacity() {
return queueCapacity;
}

public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}

public TestRejectedExecutionHandler getTestRejectedExecutionHandler() {
return testRejectedExecutionHandler;
}

public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler) {
this.testRejectedExecutionHandler = testRejectedExecutionHandler;
}
}

第7步: 创建IThreadPoolMonitorService接口

创建IThreadPoolMonitorService接口

package com.otv.monitor.srv;

import java.util.concurrent.ThreadPoolExecutor;

public interface IThreadPoolMonitorService extends Runnable {

public void monitorThreadPool();

public ThreadPoolExecutor getExecutor();

public void setExecutor(ThreadPoolExecutor executor);
}

第8步:创建ThreadPoolMonitorService类

ThreadPoolMonitorService类实现了IThreadPoolMonitorService接口。这个类用来监控已创建的线程池。

package com.otv.monitor.srv;

import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;

/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class ThreadPoolMonitorService implements IThreadPoolMonitorService {

private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class);
ThreadPoolExecutor executor;
private long monitoringPeriod;

public void run() {
try {
while (true){
monitorThreadPool();
Thread.sleep(monitoringPeriod*1000);
}
} catch (Exception e) {
log.error(e.getMessage());
}
}

public void monitorThreadPool() {
StringBuffer strBuff = new StringBuffer();
strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize());
strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize());
strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize());
strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount());
strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount());
strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount());
strBuff.append(" - isTerminated : ").append(executor.isTerminated());

log.debug(strBuff.toString());
}

public ThreadPoolExecutor getExecutor() {
return executor;
}

public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

public long getMonitoringPeriod() {
return monitoringPeriod;
}

public void setMonitoringPeriod(long monitoringPeriod) {
this.monitoringPeriod = monitoringPeriod;
}
}

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com