import javax.websocket.server.ServerEndpoint;
//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;
@ServerEndpoint(value = \@Component
public class MyWebSocket { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet
//与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session;
public static String username = null;
private static final String EXCHANGE_NAME = \ /**
* 连接建立成功调用的方法*/ @OnOpen
public void onOpen(@PathParam(\ this.session = session;
webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1
System.out.println(\有新连接加入!当前在线人数为\ try {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\
Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明direct类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, \
String queueName = channel.queueDeclare().getQueue();
// 指定binding_key
channel.queueBind(queueName, EXCHANGE_NAME, username); channel.queueBind(queueName, EXCHANGE_NAME, \
System.out.println(\
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody());
System.out.println(MyWebSocket.webSocketSet.size()+\ for(MyWebSocket item: MyWebSocket.webSocketSet){ if(item.equals(this)){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } }
} }
} catch (Exception e) {
System.out.println(\异常\ } }
/**
* 连接关闭调用的方法 */
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1
System.out.println(\有一连接关闭!当前在线人数为\ }
/**
* 收到客户端消息后调用的方法 *
* @param message 客户端发送过来的消息*/ @OnMessage
public void onMessage(String message, Session session) {
System.out.println(\来自客户端的消息:\
//群发消息
for (MyWebSocket item : webSocketSet) { try {
item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } }
/**
* 发生错误时调用 */
@OnError
public void onError(Session session, Throwable error) { System.out.println(\发生错误\ error.printStackTrace(); }
public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); }
/**
* 群发自定义消息 * */
public static void sendInfo(String message) throws IOException { for (MyWebSocket item : webSocketSet) { try {
item.sendMessage(message); } catch (IOException e) { continue; } } }
public static synchronized int getOnlineCount() { return onlineCount; }
public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; }
public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }
8.编写GetHttpSessionConfigurator
package com.websocket;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
public class GetHttpSessionConfigurator extends ServerEndpointConfig.Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession)request.getHttpSession();
config.getUserProperties().put(HttpSession.class.getName(),httpSession); } } 10.编写WebSocketConfig
package com;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig { @Bean
public ServerEndpointExporter serverEndpointExporter (){ return new ServerEndpointExporter(); } }
11.当然, Erlang和RabbitMQ服务要提前安装好。
在运行之前我们要启动该服务,
12.双机即可
13.之后就可以启动项目了。生产,并且消费了。