熟悉时间同步服务,xxl-job 等调度时可能会出现异常。
【2019-12-31】
说明:
1. 以master为主 (master是按照网络时间为主: ntp.org)
2. slave1、slave2 是以master时间为准
sudo yum install ntp
sudo systemctl start ntpd
sudo systemctl status ntpd
1.修改ntp.conf配置
sudo vi /etc/ntp.conf
2.把配置文件中默认的配置注释掉
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
3.在ntp.conf 新增配置 (以master时间为准)
server master iburst
4.重启ntp服务
sduo systemctl start ntpd
watch ntpq -p
熟悉虚拟机克隆、网络配置及免密钥登陆
【2019-12-30】
1. 选择“master”虚拟机 --> 管理 --> 克隆完整.. --> 重命名(slave1、slave2) --> 选择与master 同样的目录文件(slave1、slave2)
2.1 更改网卡IP (参考1) /etc/sysconfig/network-scripts
2.2 更爱mac地址 通过命令获取:ifconfig
2.3 更改UUID 通过命令获取:nmcli con show
2.4 更改主机名 /etc/hostname
3. ssh 免秘钥登录(守护进程的启停,免秘钥登录)
3.1 各个节点在meizhangzheng用户下执行命令:
ssh-keygen -t rsa
touch authorized_keys # 创建授权文件
3.2 master 节点中将公钥id_rsa.pub拷贝到authorized_keys文件
cat id_rsa.pub >> ~/.ssh/authorized_keys
3.3 给authorized_key文件授权
chmod 600 authorized_keys # 读写权限
3.4 将authorized_key文件复制到slave1中
scp authorized_keys meizhangzheng@slave1:~/.ssh
同理在slave1中
cd ~/.ssh
cat id_rsa.pub >> authorized_keys
scp authorized_keys meizhanghzneg@slave2:~/.ssh
在slave2中
cd ~/.ssh
# 将mster 、slave1、slave2中的key全都合并到了slave2的
# authorized_keys文件中了
cat id_rsa.pub >> authorized_keys
3.5 将slave2合并后的authorized_keys文件拷贝到master、slave1
scp authorized_keys meizhangzheng@master:~/.ssh
scp authorized_keys meizhangzheng@slave1:~/.ssh
3.6 验证是否成功(在master虚拟机执行命令)
ssh slave1 # 不需要密码直接可登录
连接的是root用户,切换到meizhangzheng用户: sudo su meizhangzheng
核心:熟悉Hadoop集群搭建、网络规划及环境配置
【2019-12-14】 软件地址下载
略…
start:
硬件配置
根据实际情况分配内存、硬盘
master: 4G、40G meizhangzheng/mzz123 192.168.1.10
slave1: 2G、40G meizhangzheng/mzz123 192.168.1.20
slave2: 2G、40G meizhangzheng/mzz123 192.168.1.30
虚拟机网络设置
NAT8、不自动分配IP、 DHCP 设置
子网:192.168.1.0
掩码:255.255.255.0
网关:192.168.1.2 (NAT配置)
(可创建桌面应用:选择 GUNE)
给meizhangzheng用户设置root下的命令
#在root用户下(su - root 设置密码)
chmod u+w /etc/sudoers
vi /etc/sudoers
meizhangzheng ALL=(ALL) NOPASSWORD:ALL
(系统上的某些应用程序可能实际上不支持此安全机制)
可以使用meizhangzhengyong用户设置SELinux信息 (具体设置参数,有示例) ``` sudo vi /etc/sysconfig/selinux
SELINUX=disabled
#### 4.3 IP设置
```shell
# 4.3.1 通过ifconfig 查看网卡名称:ens33 ,进入网卡配置路径
cd /etc/sysconfig/network-scripts/
# 4.3.2 创建网卡
vi ifcfg-ens33
# 4.3.3 编辑网卡信息 (注意: **NAME** 、 **DEVICE** 、UUID、
IPADDR、IPADDR、GATEWAY、DNS1、 **HWADDR** ONBOOT=YES 的配置:
信息可以从ifconfig中查看)
TYPE=Ethernet
PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=none
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=5cf61487-6084-4445-9cca-c5da7205e984
DEVICE=ens33
ONBOOT=YES
IPADDR=192.168.1.10
PREFIX=24
GATEWAY=192.168.1.2
DNS1=8.8.8.8
HWADDR=00:0C:29:12:42:E2
# 4.3.4 重启网卡信息
sudo systemctl restart network
(防止因为防火墙问题,导致集群间通讯问题)
# 4.4.1 查看防火墙状态
sudo systemctl staus firewalld.service
# 4.4.2 关闭防火墙服务
sudo systemctl stop firewalld.service
# 4.4.3 禁止firewall开机启动
sudo systemctl disable firewalld.service
(master /node /node 间通讯)
# 4.5.1 查看ssh服务状态
sudo systemctl status sshd.service
# 4.5.2 启动ssh服务 (running)
sudo systemctl start sshd.service
# 4.5.3 设置开机启动ssh服务
sudo systemctl enable sshd.service
文件信息: /etc/hosts
# 4.6.1 配置hosts
vi /etc/hosts
192.168.1.10 master
192.168.1.20 slave1
192.168.1.30 slave2
配置主机名( 需要与与IP对应)
sudo vi /etc/hostname # 添加内容如下:
master
节点间时间同步
4.8.1 配置ntp服务
yum install ntpdate (所有节点)
export JAVA_HOME=/home/meizhangzheng/jdk1.8.0_231
export CLASSPATH=.:$JAVA_HOME/lib/jd.jar;$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
hive 是基于 Hadoop 的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。
Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。
别名: 与 MySQL 有区别
MySQL: '' 引号
Hive ``
SELECT
week_name AS `经营周`,
start_time_string AS `开始时间`,
end_time_string AS `结束时间`,
device_code AS `设备名称`,
project_name AS `楼盘名称`,
city_name AS `城市名称`,
area_name AS `城市区域`,
address AS `地址`,
surplus_rate AS `余量上刊率`,
cash_rate AS `现金上刊率`,
substitution_rate AS `置换上刊率`,
give_rate AS `赠播上刊率`,
comprehensive_rate AS `综合上刊率`
FROM
(
SELECT
*,
row_number ( ) over ( ORDER BY week_id, device_code ) rn
FROM
dm2.dm2_pv_result_point_rate
WHERE
week_id >= 2019011
AND week_id <= 2020122 AND cus_type = 0 AND device_type = 1 AND cash_rate >= 0
AND cash_rate < 9999.0
) bb
# 1.删除表
drop table if exists passwords;
drop table if exists grpshell;
# 2.创建表
create table if not exists passwords (username string, passwd string, uid int, gid int, userinfo string, home string, shell string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ':' LINES TERMINATED BY '10';
# 3.加载数据(建立关联)
load data inpath '${hiveconf:inputFile}' into table passwords;
# 4.使用已有表数据复制到新表
create table if not exists grpshell (shell string, count int);
INSERT OVERWRITE TABLE grpshell SELECT p.shell, count(*) FROM passwords p GROUP BY p.shell;
# 5.创建分区 (基于具体的业务场景)
-- 通过 partitioned by 创建 父目录、子目录 (区分动态分区、静态分区 内部表与外部表)
create table if not exists table_name (
c1 string comment 'c1',
c2 string comment 'c2',
c3 string comment 'c3'
) comment 'table_name' partitioned by (month_id string,day_id string)
-- 可以定义行分隔符 row format delimited fields terminated by '|'
stored as textfile -- parquet orc text;
-- 6.创建外部表 (external)
-- 通过 partitioned by 创建 父目录、子目录 (区分动态分区、静态分区 内部表与外部表)
create external table if not exists table_name (
c1 string comment 'c1',
c2 string comment 'c2',
c3 string comment 'c3'
) comment 'table_name' partitioned by (month_id string,day_id string)
-- 可以定义行分隔符 row format delimited fields terminated by '|'
stored as textfile -- parquet orc text;
# 7.删除分区
-- 区分删除外部分区(删分区,只删除元数据,不删除目录及数据),内部表则会删除元数据,也会删除相应的目录和数据
alter table table_name drop partition(month_id='',day_id='')
-- 查看分区
show partitions table_name;
-- 动态、静态分区适用场景
-- 数据量不大,一般适用动态分区
--注意:以下设置,只在当前的会话窗口有效
-- 1.打开动态分区模式:
set hive.exec.dynamic.partition=true;
-- 2.设置分区模式为非严格模式
set hive.exec.dynamic.partition.mode=nonstrict;
# 8.区分insert overwrite /insert into
-- 覆盖插入 (覆盖202309/22目录下的所有数据)
insert overwrite table table_name partition(month_id='202309',day_id='22')
select * from source_table_name;
-- 追加插入 (插入到静态分区父目录为202309,子目录为22下)
insert into table table_name partition(month_id='202309',day_id='22')
select * from source_table_name;
```sql
– 1. 时间函数
select current_date;
select current_timestamp;
select date_sub(current_date,1);
select date_add(current_date,1); – 时间差值 select date_diff(current_date,current_date);
大数据工具资源列表
快速获取工具
了解已使用版本及最新版本
VM :https://pan.baidu.com/s/1pAifO88rBZ4KUsmAcJd4lg 提取码:bczq
CenOS 7 :https://www.centos.org/download/ – 7.7.1908
http://mirrors.163.com/centos/7.7.1908/isos/x86_64/
【OpenJDK 8】: https://adoptopenjdk.net/
【Scala 2.13.0】 : https://www.scala-lang.org/download/all.html
【Spark 2.4.5】: https://mirror.bit.edu.cn/apache/spark/spark-2.4.5/spark-2.4.5-bin-without-hadoop.tgz
【Kafak 2.5.0】:https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
【Flume 1.9.0】http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
核心:通过ssh 连接 Linux服务器,实现运维相关工作,(执行脚本、上传、下载等)
JSch 是SSH2的一个纯Java实现。它允许你连接到一个sshd 服务器,使用端口转发,X11转发,文件传输等等。你可以将它的功能集成到你自己的 程序中。同时该项目也提供一个J2ME版本用来在手机上直连SSHD服务器。
环境配置类
@Configuration
@Data
public class JSchConnectConfig implements Serializable {
/**
* 是否密码登录
*/
@Value("${jsch.passwordLogin:false}")
private Boolean passwordLogin;
/**
* 主机
*/
@Value("${jsch.host:192.168.83.165}")
private String host;
/**
* 端口
*/
@Value("${jsch.port:22}")
private Integer port;
/**
* 用户名
*/
@Value("${jsch.userName:omm}")
private String userName;
/**
* 密码
*/
@Value("${jsch.password:password}")
private String passWord;
/**
* 私钥
*/
@Value("${jsch.privateKey:privateKey}")
private String privateKey;
/**
* 30分钟
* 超时时间 ms (1000 * 60 * 30)
*/
@Value("${jsch.timeout:1800000}")
private Integer timeout;
@Value("${spring.profiles.active:dev}")
private String active;
}
JSchUtil 工具类
import com.jcraft.jsch.*;
import com.xinchao.dataschedule.web.config.JSchConnectConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.*;
import java.util.List;
import java.util.Properties;
@Slf4j
public class JSchUtil {
/**
* exec
*/
public static final String EXEC = "exec";
public static final String SFTP = "sftp";
public static final String SHELL = "shell";
/**
* 通道Exec
*/
private ChannelExec channelExec;
/**
* 通道Sftp
*/
private ChannelSftp channelSftp;
/**
* 通道Shell
*/
private ChannelShell channelShell;
/**
* session
*/
private Session session;
/**
* 规避多线程并发
*/
private static ThreadLocal<JSchUtil> execLocal = new ThreadLocal<>();
/**
* 获取execchannel
*
* @param connectConfig 连接配置
* @return
* @throws Exception
* @throws JSchException
*/
private void init(JSchConnectConfig connectConfig) throws Exception {
log.info("连接配置信息:{}", connectConfig);
String host = connectConfig.getHost();
int port = connectConfig.getPort();
String userName = connectConfig.getUserName();
//创建JSch对象
JSch jsch = new JSch();
if (!connectConfig.getPasswordLogin().booleanValue()) {
//添加私钥(信任登录方式)
if (StringUtils.isBlank(connectConfig.getPrivateKey())) {
log.error("JSch private key 为空");
return;
}
String filePath = "";
if ("dev".equals(connectConfig.getActive())) {
filePath = this.getClass().getResource("/").getPath() + PATH_DELIMITER + connectConfig.getPrivateKey();
} else {
filePath = PATH_DELIMITER + connectConfig.getPrivateKey();
}
jsch.addIdentity(filePath);
}
session = jsch.getSession(userName, host, port);
if (log.isInfoEnabled()) {
log.info(" JSCH Session created,execHost = {}, execUserName={}", host, userName);
}
//设置密码
if (connectConfig.getPasswordLogin().booleanValue()) {
if (StringUtils.isBlank(connectConfig.getPassWord())) {
log.error("JSch password 为空");
return;
}
session.setPassword(connectConfig.getPassWord());
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
//设置超时
session.setTimeout(connectConfig.getTimeout());
session.setServerAliveInterval(5000);
//建立连接
session.connect();
if (log.isInfoEnabled()) {
log.info("JSCH Session connected.execHost = {}, execUserName={}", host, userName);
}
if (log.isInfoEnabled()) {
log.info("Connected successfully to execHost = {}, execUserName={}", host, userName);
}
}
/**
* 是否已连接
*
* @return 是否连接成功
*/
private boolean isConnected() {
return null != channelExec && channelExec.isConnected();
}
/**
* 获取本地线程存储的exec客户端
*
* @return
* @throws Exception 异常
*/
public static JSchUtil getSftpUtil(JSchConnectConfig connectConfig) throws Exception {
JSchUtil execUtil = execLocal.get();
if (null == execUtil || !execUtil.isConnected()) {
execLocal.set(new JSchUtil(connectConfig));
}
return execLocal.get();
}
/**
* 释放本地线程存储的exec客户端
*/
public static void release() {
if (null != execLocal.get()) {
execLocal.get().closeChannel();
execLocal.remove();
}
}
/**
* 构造函数
* <p>
* 非线程安全,故权限为私有
* </p>
*
* @throws Exception 异常
*/
private JSchUtil(JSchConnectConfig connectConfig) throws Exception {
super();
init(connectConfig);
}
/**
* 关闭通道
*
* @throws Exception 异常
*/
public void closeChannel() {
if (null != channelExec) {
try {
channelExec.disconnect();
} catch (Exception e) {
log.error("关闭EXEC通道发生异常:", e);
}
}
if (null != channelSftp) {
try {
channelSftp.disconnect();
} catch (Exception e) {
log.error("关闭SFTP通道发生异常:", e);
}
}
if (null != channelShell) {
try {
channelShell.disconnect();
} catch (Exception e) {
log.error("关闭SHELL通道发生异常:", e);
}
}
if (null != session) {
try {
session.disconnect();
} catch (Exception e) {
log.error("EXEC关闭 session异常:", e);
}
}
}
/**
* returnCode :0 :正常 其余表示异常 (不能活返回结果)
*
* @param commands 命令
* @return 结果
* @throws IOException IO异常
* @throws JSchException JSch异常
*/
public int execShell(List<String> commands) throws JSchException, IOException {
log.info("exec shell 执行的脚本为:{}", commands);
int returnCode = 0;
channelShell = (ChannelShell) session.openChannel(SHELL);
// 输入重定向
PipedOutputStream pipe = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(pipe);
PrintWriter pw = new PrintWriter(pipe);
channelShell.setInputStream(in);
// 输出重定向
PrintStream out = System.out;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
//channelShell.setOutputStream(out); 不设置输出,不然重定向异常
for (String cmd : commands) {
pw.println(cmd);
}
pw.println("exit");
pw.flush();
channelShell.connect();
// 等待关闭
while (!channelShell.isClosed()) {
}
// 重定向回来
System.out.flush();
System.setOut(out);
returnCode = channelShell.getExitStatus();
log.info("返回的结果为:{}", returnCode);
log.info(baos.toString());
channelShell.disconnect();
session.disconnect();
return returnCode;
}
/**
* returnCode :0 :正常 其余表示异常 (不能活返回结果)
*
* @param command 命令
* @return 结果
* @throws IOException IO异常
* @throws JSchException JSch异常
*/
public int execCommand(String command) throws IOException, JSchException {
log.info("exec 执行的脚本为:{}", command);
int returnCode = -1;
if (channelExec == null || channelExec.isClosed()) {
channelExec = (ChannelExec) session.openChannel(EXEC);
}
channelExec.setCommand(command);
channelExec.setInputStream(null);
channelExec.setErrStream(System.err);
InputStream in = channelExec.getInputStream();
channelExec.connect(2000);
byte[] tmp = new byte[1024];
while (true) {
while (in.available() > 0) {
int i = in.read(tmp, 0, 1024);
if (i < 0) {
break;
}
log.info("exec 返回的内容为:{}", new String(tmp, 0, i));
}
if (channelExec.isClosed()) {
if (in.available() > 0) {
continue;
}
returnCode = channelExec.getExitStatus();
log.info("exit-status:{}", returnCode);
break;
}
}
return returnCode;
}
/**
* 从 服务器 获取 数据
*
* @param filePath 文件路径
*/
public InputStream getLog(String filePath) {
try {
log.info("日志文件路径为:{}", filePath);
channelSftp = (ChannelSftp) session.openChannel(SFTP);
channelSftp.connect();
return channelSftp.get(filePath);
} catch (SftpException | JSchException e) {
log.error("下载文件异常:{}", e);
}
return null;
}
}